Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
calum-chamberlain committed Sep 6, 2023
2 parents bc09a6b + 8c6c324 commit 9b32ec2
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 19 deletions.
26 changes: 22 additions & 4 deletions rt_eqcorrscan/console_scripts/build_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
GPL v3.0
"""

import os
import logging
import faulthandler

Expand All @@ -29,17 +30,25 @@ def run(
chunk_size: float = 30,
rebuild: bool = True,
max_workers: int = None,
save_raw: bool = False,
**kwargs
):
config = read_config(config_file=kwargs.get("config_file", None))
debug = kwargs.get("debug", False)
working_dir = kwargs.get("working_dir", None)
if debug:
config.log_level = "DEBUG"
print(f"Using the following configuration:\n{config}")
config.setup_logging()
Logger.debug("Running in debug mode - expect lots of output!")

if working_dir:
Logger.info(f"Changing to working directory: {working_dir}")
os.chdir(working_dir)


client = config.rt_match_filter.get_client()
waveform_client = config.rt_match_filter.get_waveform_client()

template_bank = TemplateBank(
config.database_manager.event_path,
Expand Down Expand Up @@ -69,7 +78,8 @@ def run(
Logger.info(f"Will make templates for {len(catalog)} events")

tribe = template_bank.make_templates(
catalog=catalog, rebuild=rebuild, client=client, **config.template)
catalog=catalog, rebuild=rebuild, client=waveform_client,
save_raw=save_raw, **config.template)
Logger.info(f"Made {len(tribe)} templates")


Expand Down Expand Up @@ -104,6 +114,13 @@ def main():
"-n", "--max-workers", type=int, default=None,
help="Maximum workers for ProcessPoolExecutor, defaults to the number "
"of cores on the machine")
parser.add_argument(
"--save-raw", action="store_true",
help="Flag to turn on saving of raw miniseed waveforms")
parser.add_argument(
"-w", "--working-dir", type=str,
help="Working directory - will change to this directory after reading "
"config file. All paths must be correct for this working dir.")

args = parser.parse_args()

Expand All @@ -124,11 +141,12 @@ def main():
else:
endtime = UTCDateTime()

kwargs.update({"debug": args.debug, "config_file": args.config})
kwargs.update({"debug": args.debug, "config_file": args.config,
"working_dir": args.working_dir})
run(starttime=starttime, endtime=endtime,
chunk_size=args.chunk_interval, rebuild=args.rebuild,
max_workers=args.max_workers, **kwargs)
max_workers=args.max_workers, save_raw=args.save_raw, **kwargs)


if __name__ == "__main__":
main()
main()
13 changes: 12 additions & 1 deletion rt_eqcorrscan/console_scripts/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
GPL v3.0
"""

import os
import logging
import faulthandler

Expand All @@ -34,12 +35,17 @@ def run(**kwargs):
debug = kwargs.get("debug", False)
update_bank = kwargs.get("update_bank", True)
listener_starttime = kwargs.get("listener_starttime", None)
working_dir = kwargs.get("working_dir", '.')
if debug:
config.log_level = "DEBUG"
print("Using the following configuration:\n{0}".format(config))
config.setup_logging()
Logger.debug("Running in debug mode - expect lots of output!")

if working_dir:
Logger.info(f"Changing to working directory: {working_dir}")
os.chdir(working_dir)

client = config.rt_match_filter.get_client()

trigger_func = partial(
Expand Down Expand Up @@ -96,12 +102,17 @@ def main():
"-s", "--listener-starttime", type=UTCDateTime,
help="UTCDateTime parsable starttime for the listener - will collect "
"events from this date to now and react to them.")
parser.add_argument(
"-w", "--working-dir", type=str,
help="Working directory - will change to this directory after reading "
"config file. All paths must be correct for this working dir.")

args = parser.parse_args()

kwargs.update({"debug": args.debug, "config_file": args.config,
"update_bank": args.update_bank,
"listener_starttime": args.listener_starttime})
"listener_starttime": args.listener_starttime,
"working_dir": args.working_dir})
run(**kwargs)


Expand Down
35 changes: 22 additions & 13 deletions rt_eqcorrscan/reactor/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,30 +172,39 @@ def run(self, max_run_length: float = None) -> None:
# Query the catalog in the listener every so often and check
self._running = True
first_iteration = True
previous_old_events , working_cat = [], Catalog() # Initialise state
while self._running:
old_events = deepcopy(self.listener.old_events)
Logger.info(f"Old events from the listener has {len(old_events)} events")
# Clear out stale events from working_cat
event_ids = [_[0] for _ in old_events]
working_cat.events = [ev for ev in working_cat if ev.resource_id in event_ids]
new_old_events = [ev for ev in old_events if ev not in previous_old_events]
# Get these locally to avoid accessing shared memory multiple times
if len(old_events) > 0:
working_ids = [_[0] for _ in old_events]
working_cat = self.template_database.get_events(
eventid=working_ids)
if len(working_ids) and not len(working_cat):
if len(new_old_events) > 0:
working_ids = [_[0] for _ in new_old_events]
Logger.info(f"Getting event info from database for {', '.join(working_ids)}")
try:
new_working_cat = self.template_database.get_events(
eventid=working_ids, _allow_update=False)
except Exception as e:
Logger.error(f"Could not get template events from database due to {e}")
if len(working_ids) and not len(new_working_cat):
Logger.warning("Error getting events from database, getting individually")
working_cat = Catalog()
for working_id in working_ids:
try:
working_cat += self.template_database.get_events(
eventid=working_id)
eventid=working_id, _allow_update=False)
except Exception as e:
Logger.error(f"Could not read {working_id} due to {e}")
continue
else:
working_cat = []
Logger.info("Currently analysing a catalog of {0} events".format(
len(working_cat)))
self.process_new_events(new_events=working_cat)
Logger.debug("Finished processing new events")
else:
working_cat += new_working_cat
Logger.info("Currently analysing a catalog of {0} events".format(
len(working_cat)))
self.process_new_events(new_events=working_cat)
Logger.debug("Finished processing new events")
previous_old_events = old_events # Overload
self.set_up_time(UTCDateTime.now())
Logger.debug(f"Up-time: {self.up_time}")
if max_run_length is not None and self.up_time >= max_run_length:
Expand Down
3 changes: 2 additions & 1 deletion rt_eqcorrscan/rt_match_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class RealTimeTribe(Tribe):
_backfill_tribe = Tribe() # Tribe of as-yet unused templates for backfilling
_last_backfill_start = UTCDateTime.now() # Time of last backfill run - update on run
_number_of_backfillers = 0 # Book-keeping of backfiller processes.
_clean_backfillers = False # If false will leave temporary backfiller dirs
_clean_backfillers = True # If false will leave temporary backfiller dirs

busy = False

Expand Down Expand Up @@ -881,6 +881,7 @@ def run(
process_cores=self.process_cores,
parallel_process=self._parallel_processing,
ignore_bad_data=True, copy_data=False,
concurrent_processing=False,
**kwargs)
Logger.info("Completed detection")
except Exception as e: # pragma: no cover
Expand Down

0 comments on commit 9b32ec2

Please sign in to comment.