From e7e5aa5a49c70550e9f3807126f66aac68fcb02d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Fri, 13 Dec 2019 13:00:17 +0100 Subject: [PATCH] Batch processing after init on ensembles --- src/fmu/ensemble/ensemble.py | 20 ++++++++++++++++++++ src/fmu/ensemble/ensembleset.py | 17 +++++++++++++++++ tests/test_ensemble.py | 16 +++++++++++++++- tests/test_ensembleset.py | 13 +++++++++++++ 4 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index b27ab23f..eae8d26f 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -874,6 +874,26 @@ def drop(self, localpath, **kwargs): except ValueError: pass # Allow localpath to be missing in some realizations + def process_batch(self, batch=None): + """Process a list of functions to run/apply + + This is equivalent to calling each function individually + but this enables more efficient concurrency. It is meant + to be used for functions that modifies the realization + object, not for functions that returns a dataframe already. + + Args: + batch (list): Each list element is a dictionary with one key, + being a function names, value pr key is a dict with keyword + arguments to be supplied to each function. + Returns: + ScratchEnsemble: This ensemble object (self), for it + to be picked up by ProcessPoolExecutor and pickling. + """ + for realization in self._realizations.values(): + realization.process_batch(batch) + return self + def apply(self, callback, **kwargs): """Callback functionalty, apply a function to every realization diff --git a/src/fmu/ensemble/ensembleset.py b/src/fmu/ensemble/ensembleset.py index a3a60c43..8a2de982 100644 --- a/src/fmu/ensemble/ensembleset.py +++ b/src/fmu/ensemble/ensembleset.py @@ -447,6 +447,23 @@ def drop(self, localpath, **kwargs): except ValueError: pass # Allow localpath to be missing in some ensembles. + def process_batch(self, batch=None): + """Process a list of functions to run/apply + + This is equivalent to calling each function individually + but this enables more efficient concurrency. It is meant + to be used for functions that modifies the realization + object, not for functions that returns a dataframe already. + + Args: + batch (list): Each list element is a dictionary with one key, + being a function names, value pr key is a dict with keyword + arguments to be supplied to each function. + """ + for ensemble in self._ensembles.values(): + if isinstance(ensemble, ScratchEnsemble): + ensemble.process_batch(batch) + def apply(self, callback, **kwargs): """Callback functionalty, apply a function to every realization diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index e0a0d38c..58e4de09 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -200,6 +200,21 @@ def test_batch(): assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490 assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25 + # Also possible to batch process afterwards: + ens = ScratchEnsemble( + "reektest", testdir + "/data/testensemble-reek001/" + "realization-*/iter-0" + ) + ens.process_batch( + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + ], + ) + assert len(ens.get_df("npv.txt")) == 5 + assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490 + assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25 + def test_emptyens(): """Check that we can initialize an empty ensemble""" @@ -273,7 +288,6 @@ def test_reek001_scalars(): assert isinstance(npv, pd.DataFrame) assert "REAL" in npv assert "npv.txt" in npv # filename is the column name - print(npv) assert len(npv) == 5 assert npv.dtypes["REAL"] == int assert npv.dtypes["npv.txt"] == object diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py index 1dc93785..1cbad066 100644 --- a/tests/test_ensembleset.py +++ b/tests/test_ensembleset.py @@ -237,6 +237,19 @@ def rms_vol2df(kwargs): assert len(ensset5.get_df("unsmry--yearly")) == 50 assert len(ensset5.get_df("unsmry--daily")) == 10980 + # Try batch processing after initialization: + ensset6 = EnsembleSet("reek001", frompath=ensdir) + ensset6.process_batch( + batch=[ + {"load_scalar": {"localpath": "npv.txt"}}, + {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + ], + ) + assert len(ensset5.get_df("npv.txt")) == 10 + assert len(ensset5.get_df("unsmry--yearly")) == 50 + assert len(ensset5.get_df("unsmry--daily")) == 10980 + # Delete the symlink and leftover from apply-testing when we are done. for real_dir in glob.glob(ensdir + "/realization-*"): if not SKIP_FMU_TOOLS: