Skip to content

Commit

Permalink
Batch processing after init on ensembles
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Dec 20, 2019
1 parent 6657bc3 commit e7e5aa5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 1 deletion.
20 changes: 20 additions & 0 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,26 @@ def drop(self, localpath, **kwargs):
except ValueError:
pass # Allow localpath to be missing in some realizations

def process_batch(self, batch=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
Returns:
ScratchEnsemble: This ensemble object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
for realization in self._realizations.values():
realization.process_batch(batch)
return self

def apply(self, callback, **kwargs):
"""Callback functionalty, apply a function to every realization
Expand Down
17 changes: 17 additions & 0 deletions src/fmu/ensemble/ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,23 @@ def drop(self, localpath, **kwargs):
except ValueError:
pass # Allow localpath to be missing in some ensembles.

def process_batch(self, batch=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
"""
for ensemble in self._ensembles.values():
if isinstance(ensemble, ScratchEnsemble):
ensemble.process_batch(batch)

def apply(self, callback, **kwargs):
"""Callback functionalty, apply a function to every realization
Expand Down
16 changes: 15 additions & 1 deletion tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ def test_batch():
assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490
assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25

# Also possible to batch process afterwards:
ens = ScratchEnsemble(
"reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0"
)
ens.process_batch(
batch=[
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
],
)
assert len(ens.get_df("npv.txt")) == 5
assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490
assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25


def test_emptyens():
"""Check that we can initialize an empty ensemble"""
Expand Down Expand Up @@ -273,7 +288,6 @@ def test_reek001_scalars():
assert isinstance(npv, pd.DataFrame)
assert "REAL" in npv
assert "npv.txt" in npv # filename is the column name
print(npv)
assert len(npv) == 5
assert npv.dtypes["REAL"] == int
assert npv.dtypes["npv.txt"] == object
Expand Down
13 changes: 13 additions & 0 deletions tests/test_ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ def rms_vol2df(kwargs):
assert len(ensset5.get_df("unsmry--yearly")) == 50
assert len(ensset5.get_df("unsmry--daily")) == 10980

# Try batch processing after initialization:
ensset6 = EnsembleSet("reek001", frompath=ensdir)
ensset6.process_batch(
batch=[
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
],
)
assert len(ensset5.get_df("npv.txt")) == 10
assert len(ensset5.get_df("unsmry--yearly")) == 50
assert len(ensset5.get_df("unsmry--daily")) == 10980

# Delete the symlink and leftover from apply-testing when we are done.
for real_dir in glob.glob(ensdir + "/realization-*"):
if not SKIP_FMU_TOOLS:
Expand Down

0 comments on commit e7e5aa5

Please sign in to comment.