Skip to content

Commit

Permalink
Merge pull request #595 from eqcorrscan/issue_592
Browse files Browse the repository at this point in the history
Fix short data for lag-calc error
  • Loading branch information
calum-chamberlain authored Dec 11, 2024
2 parents 8fd8387 + c6ae0b1 commit c5811b2
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 80 deletions.
1 change: 1 addition & 0 deletions .github/test_conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- conda-forge
- defaults
dependencies:
- conda-forge::compilers
- numpy>=1.12
- matplotlib>=1.3.0
- scipy
Expand Down
36 changes: 0 additions & 36 deletions .github/test_conda_env_macOS.yml

This file was deleted.

49 changes: 25 additions & 24 deletions .github/workflows/runtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,27 @@ jobs:
# continue-on-error: true

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Get conda env file
shell: bash -l {0}
run: |
if [ "$RUNNER_OS" == "macOS" ]; then
cp .github/test_conda_env_macOS.yml .github/test_conda_env.yml
fi
# Not needed as using the generic "compilers"
# - name: Get conda env file
# shell: bash -l {0}
# run: |
# if [ "$RUNNER_OS" == "macOS" ]; then
# cp .github/test_conda_env_macOS.yml .github/test_conda_env.yml
# fi

- name: Setup conda
uses: conda-incubator/setup-miniconda@v2.1.1
uses: conda-incubator/setup-miniconda@v3
with:
miniforge-variant: Mambaforge
miniforge-version: latest
# miniforge-variant: Miniforge
# miniforge-version: latest
python-version: ${{ matrix.python-version }}
activate-environment: eqcorrscan-test
use-mamba: true
# use-mamba: true

- name: Update Env
run: mamba env update -n eqcorrscan-test -f .github/test_conda_env.yml
run: conda env update -n eqcorrscan-test -f .github/test_conda_env.yml

- name: install eqcorrscan
shell: bash -l {0}
Expand Down Expand Up @@ -90,19 +91,19 @@ jobs:
fail-fast: false

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Setup conda
uses: conda-incubator/setup-miniconda@v2.1.1
uses: conda-incubator/setup-miniconda@v3
with:
miniforge-variant: Mambaforge
miniforge-version: latest
# miniforge-variant: Mambaforge
# miniforge-version: latest
python-version: ${{ matrix.python-version }}
activate-environment: eqcorrscan-test
use-mamba: true
# use-mamba: true

- name: Update Env
run: mamba env update -n eqcorrscan-test -f .github/test_conda_env.yml
run: conda env update -n eqcorrscan-test -f .github/test_conda_env.yml

- name: install eqcorrscan
shell: bash -l {0}
Expand Down Expand Up @@ -148,19 +149,19 @@ jobs:
fail-fast: false

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Setup conda
uses: conda-incubator/setup-miniconda@v2.1.1
uses: conda-incubator/setup-miniconda@v3
with:
miniforge-variant: Mambaforge
miniforge-version: latest
# miniforge-variant: Mambaforge
# miniforge-version: latest
python-version: ${{ matrix.python-version }}
activate-environment: eqcorrscan-test
use-mamba: true
# use-mamba: true

- name: Update Env
run: mamba env update -n eqcorrscan-test -f .github/test_conda_env.yml
run: conda env update -n eqcorrscan-test -f .github/test_conda_env.yml

- name: install fmf
shell: bash -l {0}
Expand Down
4 changes: 2 additions & 2 deletions eqcorrscan/core/lag_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ def xcorr_pick_family(family, stream, shift_len=0.2, min_cc=0.4,
phase = phase_hints.get(tr.id, None)
if phase is None and stachan.channel[1][-1] in vertical_chans:
phase = 'P'
Logger.warning(f"Unknown phase hint for {tr.id} - assigning P")
Logger.debug(f"Unknown phase hint for {tr.id} - assigning P")
elif phase is None and stachan.channel[1][-1] in horizontal_chans:
phase = 'S'
Logger.warning(f"Unknown phase hint for {tr.id} - assigning S")
Logger.debug(f"Unknown phase hint for {tr.id} - assigning S")
_waveform_id = WaveformStreamID(seed_string=tr.id)
event.picks.append(Pick(
waveform_id=_waveform_id, time=picktime,
Expand Down
6 changes: 3 additions & 3 deletions eqcorrscan/core/match_filter/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def _calculate_event(self, template=None, template_st=None,
template_pick,
key=lambda p: p.time)[_index].phase_hint
except IndexError:
Logger.error(f"No pick for trace: {tr.id}")
Logger.debug(f"No pick for trace: {tr.id}")
ev.picks.append(new_pick)
if estimate_origin and template is not None\
and template.event is not None:
Expand Down Expand Up @@ -397,7 +397,7 @@ def extract_stream(self, stream, length, prepick, all_vert=False,
pick = [p for p in pick
if p.waveform_id.channel_code == channel]
if len(pick) == 0:
Logger.info(
Logger.debug(
"No pick for {0}.{1}".format(station, channel))
continue
elif len(pick) > 1:
Expand Down Expand Up @@ -430,7 +430,7 @@ def extract_stream(self, stream, length, prepick, all_vert=False,
length) < tr.stats.delta:
cut_stream += _tr
else:
Logger.info(
Logger.debug(
"Insufficient data length for {0}".format(tr.id))
return cut_stream

Expand Down
12 changes: 12 additions & 0 deletions eqcorrscan/core/match_filter/family.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,18 @@ def lag_calc(self, stream, pre_processed, shift_len=0.2, min_cc=0.4,
"""
from eqcorrscan.core.lag_calc import xcorr_pick_family

# We should make sure we have events calculated for all detections
# we should clean out anything that was there before
# (and warn the user)
_overwritten_warning = False
for d in self.detections:
if len(d.event.picks):
_overwritten_warning = True
d._calculate_event(template=self.template)
if _overwritten_warning:
Logger.warning("Old events in family have been overwritten to "
"ensure lag-calc runs as expected")

processed_stream = self._process_streams(
stream=stream, pre_processed=pre_processed,
process_cores=process_cores, parallel=parallel,
Expand Down
2 changes: 1 addition & 1 deletion eqcorrscan/core/match_filter/helpers/tribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _download_st(
"""
Helper to download a stream from a client for a given start and end time.
Applies `buff` to extend download to (heopfully) ensure all data are
Applies `buff` to extend download to (hopefully) ensure all data are
provided. Retries download up to `retries` times, and discards data
with large gaps.
Expand Down
23 changes: 18 additions & 5 deletions eqcorrscan/core/match_filter/party.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import logging
from os.path import join
import warnings
from tempfile import template

import numpy as np
from obspy import Catalog, read_events, Stream
Expand Down Expand Up @@ -924,16 +923,28 @@ def client_lag_calc(
template_channel_ids = {
tuple(tr.id.split('.')) for f in chunk for tr in f.template.st}
st = _download_st(
starttime=chunk_start, endtime=chunk_end, buff=300.0,
starttime=chunk_start, endtime=chunk_end + pad, buff=300.0,
min_gap=min_gap, template_channel_ids=template_channel_ids,
client=client, retries=retries)
Logger.info(f"Downloaded data for {len(st)} channels, expected "
f"{len(template_channel_ids)} channels")
if len(st) == 0:
Logger.warning(
f"No data meeting standards available between "
f"{chunk_start} and {chunk_end}, skipping {len(chunk)} "
f"detections")
chunk_start += (chunk_length - chunk_overlap)
chunk_end = chunk_start + chunk_length
# catalog += chunk.get_catalog()
continue

# 3. Run lag-calc
catalog += chunk.lag_calc(
_picked_catalog = chunk.lag_calc(
stream=st, pre_processed=False, shift_len=shift_len,
*args, **kwargs)
Logger.debug(f"For {len(chunk)} a catalog of "
f"{len(_picked_catalog)} was returned")
catalog += _picked_catalog

# 4. Run next group
detections_run.update({d.id for f in chunk for d in f})
Expand Down Expand Up @@ -1059,19 +1070,21 @@ def lag_calc(self, stream, pre_processed, shift_len=0.2, min_cc=0.4,
for template_group in template_groups:
family = [_f for _f in self.families
if _f.template == template_group[0]][0]
group_seed_ids = {tr.id for template in template_group
for tr in template.st}
group_seed_ids = {tr.id for _template in template_group
for tr in _template.st}
template_stream = Stream()
for seed_id in group_seed_ids:
net, sta, loc, chan = seed_id.split('.')
template_stream += stream.select(
network=net, station=sta, location=loc, channel=chan)
# Process once and only once for each group.
Logger.info("Processing stream for group")
processed_stream = family._process_streams(
stream=template_stream, pre_processed=pre_processed,
process_cores=process_cores, parallel=parallel,
ignore_bad_data=ignore_bad_data, ignore_length=ignore_length,
select_used_chans=False)
Logger.info(f"Processed_stream: {processed_stream}")
for template in template_group:
family = [_f for _f in self.families
if _f.template == template][0]
Expand Down
31 changes: 28 additions & 3 deletions eqcorrscan/tests/pre_processing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,31 @@ def test_process_bad_data(self):
filt_order=3, samp_rate=1, starttime=False,
seisan_chan_names=True, ignore_length=False)

def test_process_partial_bad_data(self):
"""
Check that processing works as expected when we have
partial bad data. Issue #592
"""
bad_data = Stream([self.st[0].slice(
self.st[0].stats.starttime,
self.st[0].stats.starttime + 20).copy()])
bad_data += self.st[1].copy()
with self.assertRaises(NotImplementedError):
multi_process(
st=bad_data, lowcut=0.1, highcut=0.4,
filt_order=3, samp_rate=1,
starttime=bad_data[1].stats.starttime,
endtime=bad_data[1].stats.endtime,
seisan_chan_names=True, ignore_length=False)
# Check that it just drops that trace with ignore_bad_data
st_processed = multi_process(
st=bad_data, lowcut=0.1, highcut=0.4,
filt_order=3, samp_rate=1,
starttime=bad_data[1].stats.starttime,
endtime=bad_data[1].stats.endtime,
seisan_chan_names=True, ignore_bad_data=True)
self.assertEqual(len(st_processed), 1)

def test_short_data_fail(self):
"""Check that we don't allow too much missing data."""
with self.assertRaises(NotImplementedError):
Expand All @@ -283,9 +308,9 @@ def test_short_data_fail(self):
def test_short_data_pass(self):
"""Check that we do allow missing data if ignore_length is True."""
processed = multi_process(
st=self.st[0].copy().trim(endtime=self.
st[0].stats.endtime - 18000), lowcut=0.1,
highcut=0.4, filt_order=3, samp_rate=1,
st=self.st[0].copy().trim(
endtime=self.st[0].stats.endtime - 18000),
lowcut=0.1, highcut=0.4, filt_order=3, samp_rate=1,
starttime=self.day_start,
endtime=UTCDateTime(self.day_start) + 86400.,
seisan_chan_names=True, ignore_length=True)
Expand Down
19 changes: 13 additions & 6 deletions eqcorrscan/utils/pre_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def multi_process(st, lowcut, highcut, filt_order, samp_rate, parallel=False,
Logger.info(
f"{trace_id} not found in {set(tr.id for tr in st)},"
f" ignoring")
Logger.info(f"Stream after QC is: {st}")

# 3. Detrend
# ~ 2x speedup for 50 100 Hz daylong traces on 12 threads
Expand All @@ -288,21 +289,26 @@ def multi_process(st, lowcut, highcut, filt_order, samp_rate, parallel=False,
for i, _ in enumerate(st):
if float(st[i].stats.npts / st[i].stats.sampling_rate) != length:
Logger.info(
'Data for {0} are not long-enough, will zero pad'.format(
st[i].id))
'Data for {0} are not long-enough, will try to '
'zero pad'.format(st[i].id))
st[i], padded[st[i].id] = _length_check(
st[i], starttime=starttime, length=length,
ignore_length=ignore_length,
ignore_bad_data=ignore_bad_data)
# Remove None traces that might be returned from length checking
st.traces = [tr for tr in st if tr is not None]
st.traces = [tr for tr in st if tr is not None and tr.stats.npts]

# Check that we actually still have some data
if not _stream_has_data(st):
if tracein:
return st[0]
if len(st):
return st[0]
else:
return Trace()
return st

Logger.info(f"Stream after length check and padding is: {st}")

# 5. Resample
# ~ 3.25x speedup for 50 100 Hz daylong traces on 12 threads
st = _multi_resample(
Expand Down Expand Up @@ -802,6 +808,7 @@ def _group_process(filt_order, highcut, lowcut, samp_rate, process_length,

starttime = starttimes[0]
endtime = endtimes[-1]
stream_endtime = copy.deepcopy(endtime)
data_len_samps = round((endtime - starttime) * samp_rate) + 1
assert overlap < process_length, "Overlap must be less than process length"
chunk_len_samps = (process_length - overlap) * samp_rate
Expand Down Expand Up @@ -883,11 +890,11 @@ def _group_process(filt_order, highcut, lowcut, samp_rate, process_length,
f" and {_endtime}")
continue

if _endtime < stream[0].stats.endtime:
if _endtime < stream_endtime:
Logger.warning(
"Last bit of data between {0} and {1} will go unused "
"because it is shorter than a chunk of {2} s".format(
_endtime, stream[0].stats.endtime, process_length))
_endtime, stream_endtime, process_length))
return processed_streams


Expand Down

0 comments on commit c5811b2

Please sign in to comment.