Skip to content

Commit

Permalink
Basic tests for all processes
Browse files Browse the repository at this point in the history
  • Loading branch information
calum-chamberlain committed Dec 8, 2023
1 parent 40e28b5 commit 0e2e5da
Showing 1 changed file with 157 additions and 11 deletions.
168 changes: 157 additions & 11 deletions eqcorrscan/tests/matched_filter/helper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,24 @@
from obspy.clients.fdsn import Client
from obspy.clients.earthworm import Client as EWClient

from eqcorrscan.core.match_filter import Party
from eqcorrscan.core.match_filter.helpers import get_waveform_client
from eqcorrscan.core.match_filter.helpers.processes import (
_get_detection_stream, Poison)
_get_detection_stream, Poison, _prepper, _pre_processor,
_make_detections)


Logger = logging.getLogger(__name__)


def get_test_templates():
party = Party().read(
filename=os.path.join(
os.path.abspath(os.path.dirname(os.path.dirname(__file__))),
'test_data', 'test_party.tgz'))
return [f.template for f in party]


class TestHelperFunctions(unittest.TestCase):
def test_monkey_patching(self):
""" Test that monkey patching a client works. """
Expand All @@ -49,6 +59,145 @@ def tearDownClass(cls):
pass


class TestMakeDetections(ProcessTests):
directories_to_nuke = []
wait_time = 5

@classmethod
def setUpClass(cls):
templates = get_test_templates()
cls.global_kwargs = dict(
delta=0.01,
templates=templates,
threshold=8.0,
threshold_type="MAD",
save_progress=False,
)

def setUp(self):
self.kwargs = copy.copy(self.global_kwargs)
self.kwargs.update(dict(
input_queue=Queue(),
output_queue=Queue(),
poison_queue=Queue()
))

def test_poisoning(self):
self.process = Process(
target=_make_detections, kwargs=self.kwargs,
name="TestProcess")
poisoning(obj=self)

def test_poisoning_while_waiting_on_output(self):
self.process = Process(
target=_make_detections, kwargs=self.kwargs,
name="TestProcess")
poisoning_while_waiting_on_output(obj=self)

def test_poisoning_from_input(self):
self.process = Process(
target=_make_detections, kwargs=self.kwargs,
name="TestProcess")
poisoning_from_input(
obj=self, input_queue=self.kwargs['input_queue'])


class TestPrepper(ProcessTests):
directories_to_nuke = []
wait_time = 5

@classmethod
def setUpClass(cls):
templates = get_test_templates()
cls.global_kwargs = dict(
templates=templates,
group_size=5,
groups=None,
xcorr_func="fftw",
)

def setUp(self):
self.kwargs = copy.copy(self.global_kwargs)
self.kwargs.update(dict(
input_stream_filename_queue=Queue(),
output_queue=Queue(),
poison_queue=Queue()
))

def test_poisoning(self):
self.process = Process(
target=_prepper, kwargs=self.kwargs,
name="TestProcess")
poisoning(obj=self)

def test_poisoning_while_waiting_on_output(self):
self.process = Process(
target=_prepper, kwargs=self.kwargs,
name="TestProcess")
poisoning_while_waiting_on_output(obj=self)

def test_poisoning_from_input(self):
self.process = Process(
target=_prepper, kwargs=self.kwargs,
name="TestProcess")
poisoning_from_input(
obj=self, input_queue=self.kwargs['input_stream_filename_queue'])


class TestPreProcessor(ProcessTests):
directories_to_nuke = []
wait_time = 5

@classmethod
def setUpClass(cls):
process_length = 360
cls.global_kwargs = dict(
template_ids={("NZ", "WVZ", "10", "HHZ")},
pre_processed=False,
ignore_length=False,
ignore_bad_data=False,
filt_order=4,
highcut=20,
lowcut=2,
samp_rate=50,
process_length=process_length,
parallel=False,
cores=1,
daylong=False,
overlap=3.0,
)

def setUp(self):
self.kwargs = copy.copy(self.global_kwargs)
self.kwargs.update(dict(
input_stream_queue=Queue(),
temp_stream_dir=tempfile.mkdtemp(),
output_filename_queue=Queue(),
poison_queue=Queue()
))
Logger.info(self.kwargs)
self.directories_to_nuke.append(self.kwargs['temp_stream_dir'])

def test_poisoning(self):
self.process = Process(
target=_pre_processor, kwargs=self.kwargs,
name="TestProcess")
poisoning(obj=self)

def test_poisoning_while_waiting_on_output(self):
self.process = Process(
target=_pre_processor, kwargs=self.kwargs,
name="TestProcess")
poisoning_while_waiting_on_output(obj=self)

def test_poisoning_from_input(self):
self.process = Process(
target=_pre_processor, kwargs=self.kwargs,
name="TestProcess")
poisoning_from_input(
obj=self, input_queue=self.kwargs['input_stream_queue'])


class TestGetDetectionStreamProcess(ProcessTests):
directories_to_nuke = []
wait_time = 5
Expand Down Expand Up @@ -86,7 +235,7 @@ def setUp(self):
output_filename_queue=Queue(),
temp_stream_dir=tempfile.mkdtemp()))
Logger.info(self.kwargs)
# Cleanup
# Cleanup
self.directories_to_nuke.append(self.kwargs['temp_stream_dir'])

def test_poisoning(self):
Expand All @@ -105,7 +254,8 @@ def test_poisoning_from_input(self):
self.process = Process(
target=_get_detection_stream, kwargs=self.kwargs,
name="TestProcess")
poisoning_from_input(obj=self)
poisoning_from_input(
obj=self, input_queue=self.kwargs['input_time_queue'])

def test_normal_operation(self):
self.process = Process(
Expand Down Expand Up @@ -187,9 +337,9 @@ def test_exception_handling(self):
self.assertFalse(process.is_alive())


################################################################################
###############################################################################
# STANDARD PROCESS DEATH TESTS
################################################################################
###############################################################################


def poisoning(obj: ProcessTests):
Expand All @@ -198,7 +348,6 @@ def poisoning(obj: ProcessTests):
obj.kwargs['poison_queue'].put(Exception("TestException"))
time.sleep(obj.wait_time)
obj.assertFalse(obj.process.is_alive())
obj.directories_to_nuke.append(obj.kwargs['temp_stream_dir'])


def poisoning_while_waiting_on_output(obj: ProcessTests):
Expand All @@ -207,16 +356,13 @@ def poisoning_while_waiting_on_output(obj: ProcessTests):
obj.kwargs['poison_queue'].put(Exception("TestException"))
time.sleep(obj.wait_time)
obj.assertFalse(obj.process.is_alive())
obj.directories_to_nuke.append(obj.kwargs['temp_stream_dir'])


def poisoning_from_input(obj):
def poisoning_from_input(obj: ProcessTests, input_queue: Queue):
# Test death
obj.kwargs['input_time_queue'].put(Poison(Exception("TestException")))
input_queue.put(Poison(Exception("TestException")))
time.sleep(obj.wait_time)
obj.assertFalse(obj.process.is_alive())
# Cleanup
shutil.rmtree(obj.kwargs['temp_stream_dir'])


if __name__ == '__main__':
Expand Down

0 comments on commit 0e2e5da

Please sign in to comment.