diff --git a/eqcorrscan/tests/matched_filter/helper_test.py b/eqcorrscan/tests/matched_filter/helper_test.py index f8a1b6361..37fcb1ccc 100644 --- a/eqcorrscan/tests/matched_filter/helper_test.py +++ b/eqcorrscan/tests/matched_filter/helper_test.py @@ -17,14 +17,24 @@ from obspy.clients.fdsn import Client from obspy.clients.earthworm import Client as EWClient +from eqcorrscan.core.match_filter import Party from eqcorrscan.core.match_filter.helpers import get_waveform_client from eqcorrscan.core.match_filter.helpers.processes import ( - _get_detection_stream, Poison) + _get_detection_stream, Poison, _prepper, _pre_processor, + _make_detections) Logger = logging.getLogger(__name__) +def get_test_templates(): + party = Party().read( + filename=os.path.join( + os.path.abspath(os.path.dirname(os.path.dirname(__file__))), + 'test_data', 'test_party.tgz')) + return [f.template for f in party] + + class TestHelperFunctions(unittest.TestCase): def test_monkey_patching(self): """ Test that monkey patching a client works. """ @@ -49,6 +59,145 @@ def tearDownClass(cls): pass +class TestMakeDetections(ProcessTests): + directories_to_nuke = [] + wait_time = 5 + + @classmethod + def setUpClass(cls): + templates = get_test_templates() + cls.global_kwargs = dict( + delta=0.01, + templates=templates, + threshold=8.0, + threshold_type="MAD", + save_progress=False, + ) + + def setUp(self): + self.kwargs = copy.copy(self.global_kwargs) + self.kwargs.update(dict( + input_queue=Queue(), + output_queue=Queue(), + poison_queue=Queue() + )) + + def test_poisoning(self): + self.process = Process( + target=_make_detections, kwargs=self.kwargs, + name="TestProcess") + poisoning(obj=self) + + def test_poisoning_while_waiting_on_output(self): + self.process = Process( + target=_make_detections, kwargs=self.kwargs, + name="TestProcess") + poisoning_while_waiting_on_output(obj=self) + + def test_poisoning_from_input(self): + self.process = Process( + target=_make_detections, kwargs=self.kwargs, + name="TestProcess") + poisoning_from_input( + obj=self, input_queue=self.kwargs['input_queue']) + + +class TestPrepper(ProcessTests): + directories_to_nuke = [] + wait_time = 5 + + @classmethod + def setUpClass(cls): + templates = get_test_templates() + cls.global_kwargs = dict( + templates=templates, + group_size=5, + groups=None, + xcorr_func="fftw", + ) + + def setUp(self): + self.kwargs = copy.copy(self.global_kwargs) + self.kwargs.update(dict( + input_stream_filename_queue=Queue(), + output_queue=Queue(), + poison_queue=Queue() + )) + + def test_poisoning(self): + self.process = Process( + target=_prepper, kwargs=self.kwargs, + name="TestProcess") + poisoning(obj=self) + + def test_poisoning_while_waiting_on_output(self): + self.process = Process( + target=_prepper, kwargs=self.kwargs, + name="TestProcess") + poisoning_while_waiting_on_output(obj=self) + + def test_poisoning_from_input(self): + self.process = Process( + target=_prepper, kwargs=self.kwargs, + name="TestProcess") + poisoning_from_input( + obj=self, input_queue=self.kwargs['input_stream_filename_queue']) + + +class TestPreProcessor(ProcessTests): + directories_to_nuke = [] + wait_time = 5 + + @classmethod + def setUpClass(cls): + process_length = 360 + cls.global_kwargs = dict( + template_ids={("NZ", "WVZ", "10", "HHZ")}, + pre_processed=False, + ignore_length=False, + ignore_bad_data=False, + filt_order=4, + highcut=20, + lowcut=2, + samp_rate=50, + process_length=process_length, + parallel=False, + cores=1, + daylong=False, + overlap=3.0, + ) + + def setUp(self): + self.kwargs = copy.copy(self.global_kwargs) + self.kwargs.update(dict( + input_stream_queue=Queue(), + temp_stream_dir=tempfile.mkdtemp(), + output_filename_queue=Queue(), + poison_queue=Queue() + )) + Logger.info(self.kwargs) + self.directories_to_nuke.append(self.kwargs['temp_stream_dir']) + + def test_poisoning(self): + self.process = Process( + target=_pre_processor, kwargs=self.kwargs, + name="TestProcess") + poisoning(obj=self) + + def test_poisoning_while_waiting_on_output(self): + self.process = Process( + target=_pre_processor, kwargs=self.kwargs, + name="TestProcess") + poisoning_while_waiting_on_output(obj=self) + + def test_poisoning_from_input(self): + self.process = Process( + target=_pre_processor, kwargs=self.kwargs, + name="TestProcess") + poisoning_from_input( + obj=self, input_queue=self.kwargs['input_stream_queue']) + + class TestGetDetectionStreamProcess(ProcessTests): directories_to_nuke = [] wait_time = 5 @@ -86,7 +235,7 @@ def setUp(self): output_filename_queue=Queue(), temp_stream_dir=tempfile.mkdtemp())) Logger.info(self.kwargs) - # Cleanup + # Cleanup self.directories_to_nuke.append(self.kwargs['temp_stream_dir']) def test_poisoning(self): @@ -105,7 +254,8 @@ def test_poisoning_from_input(self): self.process = Process( target=_get_detection_stream, kwargs=self.kwargs, name="TestProcess") - poisoning_from_input(obj=self) + poisoning_from_input( + obj=self, input_queue=self.kwargs['input_time_queue']) def test_normal_operation(self): self.process = Process( @@ -187,9 +337,9 @@ def test_exception_handling(self): self.assertFalse(process.is_alive()) -################################################################################ +############################################################################### # STANDARD PROCESS DEATH TESTS -################################################################################ +############################################################################### def poisoning(obj: ProcessTests): @@ -198,7 +348,6 @@ def poisoning(obj: ProcessTests): obj.kwargs['poison_queue'].put(Exception("TestException")) time.sleep(obj.wait_time) obj.assertFalse(obj.process.is_alive()) - obj.directories_to_nuke.append(obj.kwargs['temp_stream_dir']) def poisoning_while_waiting_on_output(obj: ProcessTests): @@ -207,16 +356,13 @@ def poisoning_while_waiting_on_output(obj: ProcessTests): obj.kwargs['poison_queue'].put(Exception("TestException")) time.sleep(obj.wait_time) obj.assertFalse(obj.process.is_alive()) - obj.directories_to_nuke.append(obj.kwargs['temp_stream_dir']) -def poisoning_from_input(obj): +def poisoning_from_input(obj: ProcessTests, input_queue: Queue): # Test death - obj.kwargs['input_time_queue'].put(Poison(Exception("TestException"))) + input_queue.put(Poison(Exception("TestException"))) time.sleep(obj.wait_time) obj.assertFalse(obj.process.is_alive()) - # Cleanup - shutil.rmtree(obj.kwargs['temp_stream_dir']) if __name__ == '__main__':