Skip to content

Commit

Permalink
try to add progressbar
Browse files Browse the repository at this point in the history
  • Loading branch information
calum-chamberlain committed Jan 8, 2024
1 parent 3af0af4 commit ae67f9a
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions rt_eqcorrscan/database/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from typing import Optional, Union, Callable, Iterable, List
from concurrent.futures import Executor
from multiprocessing import cpu_count
from functools import partial

from obsplus import EventBank
Expand Down Expand Up @@ -127,6 +128,17 @@ def _summarize_template(
return out


def _chunksize(n_tasks: int, max_workers: int = None) -> int:
max_workers = max_workers or cpu_count()
return n_tasks // (max_workers - 1)


def _workers(executor) -> int:
if hasattr(executor, '_max_workers'):
return executor._max_workers
return None


class _Result(object):
"""
Thin imitation of concurrent.futures.Future
Expand Down Expand Up @@ -165,6 +177,7 @@ class _SerialExecutor(Executor):
>>> print(results)
[0, 1, 4, 9, 16]
"""
_max_workers = 1
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -292,7 +305,9 @@ def get_templates(self, use_pickled: bool = True, **kwargs) -> Tribe:
paths = list(self._template_paths(**kwargs))
_tread = partial(_lazy_template_read, read_pickle=use_pickled)
return Tribe(list(_ for _ in tqdm.tqdm(
(t for t in self.executor.map(_tread, paths)),
(t for t in self.executor.map(
_tread, paths,
chunksize=_chunksize(len(paths), _workers(self.executor)))),
total=len(paths)) if _ is not None))
# future = self.executor.map(_tread, paths)
# return Tribe([t for t in future if t is not None])
Expand Down Expand Up @@ -335,14 +350,19 @@ def put_templates(
bank_path=self.bank_path)
with self.index_lock:
_ = list(tqdm.tqdm(
(_ for _ in self.executor.map(inner_put_template, templates)),
(_ for _ in self.executor.map(
inner_put_template, templates,
chunksize=_chunksize(len(templates),
_workers(self.executor)))),
total=len(templates)))

def pickle_templates(self, **kwargs) -> List:
""" Pickle templates in the db for faster reading later. """
paths = [p for p in self._template_paths(**kwargs)]
Logger.info(f"Pickling {len(paths)} templates...")
future = self.executor.map(_pickle_template, paths)
future = self.executor.map(
_pickle_template, paths,
chunksize=_chunksize(len(paths), _workers(self.executor)))
issues = list(_ for _ in tqdm.tqdm(
(f for f in future), total=len(paths))
if _ is not None)
Expand Down

0 comments on commit ae67f9a

Please sign in to comment.