Skip to content

Commit

Permalink
Forward batch command from ensembles and ensemblesets
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Dec 20, 2019
1 parent 275145c commit 6657bc3
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 17 deletions.
30 changes: 23 additions & 7 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class ScratchEnsemble(object):
off to gain more fined tuned control.
manifest: dict or filename to use for manifest. If filename,
it must be a yaml-file that will be parsed to a single dict.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""

def __init__(
Expand All @@ -84,6 +89,7 @@ def __init__(
runpathfilter=None,
autodiscovery=True,
manifest=None,
batch=None,
):
self._name = ensemble_name # ensemble name
self._realizations = {} # dict of ScratchRealization objects,
Expand Down Expand Up @@ -129,13 +135,13 @@ def __init__(
# Search and locate minimal set of files
# representing the realizations.
count = self.add_realizations(
paths, realidxregexp, autodiscovery=autodiscovery
paths, realidxregexp, autodiscovery=autodiscovery, batch=batch
)

if isinstance(runpathfile, str) and runpathfile:
count = self.add_from_runpathfile(runpathfile, runpathfilter)
count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch)
if isinstance(runpathfile, pd.DataFrame) and not runpathfile.empty:
count = self.add_from_runpathfile(runpathfile, runpathfilter)
count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch)

if manifest:
# The _manifest variable is set using a property decorator
Expand Down Expand Up @@ -205,7 +211,9 @@ def _shortcut2path(keys, shortpath):
# calling function handle further errors.
return shortpath

def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
def add_realizations(
self, paths, realidxregexp=None, autodiscovery=True, batch=None
):
"""Utility function to add realizations to the ensemble.
Realizations are identified by their integer index.
Expand All @@ -220,6 +228,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
to file system. Absolute or relative paths.
autodiscovery (boolean): whether files can be attempted
auto-discovered
batch (list): Batch commands sent to each realization.
Returns:
count (int): Number of realizations successfully added.
Expand All @@ -236,7 +245,10 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
count = 0
for realdir in globbedpaths:
realization = ScratchRealization(
realdir, realidxregexp=realidxregexp, autodiscovery=autodiscovery
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
if realization.index is None:
logger.critical(
Expand All @@ -252,7 +264,7 @@ def add_realizations(self, paths, realidxregexp=None, autodiscovery=True):
logger.info("add_realizations() found %d realizations", len(self._realizations))
return count

def add_from_runpathfile(self, runpath, runpathfilter=None):
def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
"""Add realizations from a runpath file typically
coming from ERT.
Expand All @@ -269,6 +281,7 @@ def add_from_runpathfile(self, runpath, runpathfilter=None):
a Pandas DataFrame parsed from a runpath file
runpathfilter (str). A filter which each filepath has to match
in order to be included. Default None which means not filter
batch (list): Batch commands to be sent to each realization.
Returns:
int: Number of successfully added realizations.
Expand Down Expand Up @@ -299,7 +312,10 @@ def add_from_runpathfile(self, runpath, runpathfilter=None):
continue
logger.info("Adding realization from %s", row["runpath"])
realization = ScratchRealization(
row["runpath"], index=int(row["index"]), autodiscovery=False
row["runpath"],
index=int(row["index"]),
autodiscovery=False,
batch=batch,
)
# Use the ECLBASE from the runpath file to
# ensure we recognize the correct UNSMRY file
Expand Down
34 changes: 25 additions & 9 deletions src/fmu/ensemble/ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ class EnsembleSet(object):
autodiscovery: boolean, sent to initializing Realization objects,
instructing them on whether certain files should be
auto-discovered.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""

def __init__(
Expand All @@ -67,6 +70,7 @@ def __init__(
iterregexp=None,
batchregexp=None,
autodiscovery=True,
batch=None,
):
self._name = name
self._ensembles = {} # Dictionary indexed by each ensemble's name.
Expand Down Expand Up @@ -101,21 +105,25 @@ def __init__(
return

if ensembles and isinstance(ensembles, list):
if batch:
logger.warning(
"Batch commands not procesed when loading finished ensembles"
)
for ensemble in ensembles:
if isinstance(ensemble, (ScratchEnsemble, VirtualEnsemble)):
self._ensembles[ensemble.name] = ensemble
else:
logger.warning("Supplied object was not an ensemble")
if not self._ensembles:
logger.warning("No ensembles added to EnsembleSet")

if frompath:
self.add_ensembles_frompath(
frompath,
realidxregexp,
iterregexp,
batchregexp,
autodiscovery=autodiscovery,
batch=batch,
)
if not self._ensembles:
logger.warning("No ensembles added to EnsembleSet")
Expand All @@ -124,7 +132,7 @@ def __init__(
if not os.path.exists(runpathfile):
logger.error("Could not open runpath file %s", runpathfile)
raise IOError
self.add_ensembles_fromrunpath(runpathfile)
self.add_ensembles_fromrunpath(runpathfile, batch=batch)
if not self._ensembles:
logger.warning("No ensembles added to EnsembleSet")

Expand Down Expand Up @@ -171,6 +179,7 @@ def add_ensembles_frompath(
iterregexp=None,
batchregexp=None,
autodiscovery=True,
batch=None,
):
"""Convenience function for adding multiple ensembles.
Expand All @@ -189,7 +198,10 @@ def add_ensembles_frompath(
autodiscovery: boolean, sent to initializing Realization objects,
instructing them on whether certain files should be
auto-discovered.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""
# Try to catch the most common use case and make that easy:
if isinstance(paths, str):
Expand Down Expand Up @@ -244,7 +256,7 @@ def add_ensembles_frompath(
for path in globbedpaths:
real = None
iterr = None # 'iter' is a builtin..
batch = None
batchname = None
for path_comp in reversed(path.split(os.path.sep)):
realmatch = re.match(realidxregexp, path_comp)
if realmatch:
Expand All @@ -258,9 +270,9 @@ def add_ensembles_frompath(
for path_comp in reversed(path.split(os.path.sep)):
batchmatch = re.match(batchregexp, path_comp)
if batchmatch:
batch = str(itermatch.group(1))
batchname = str(itermatch.group(1))
break
df_row = {"path": path, "real": real, "iter": iterr, "batch": batch}
df_row = {"path": path, "real": real, "iter": iterr, "batch": batchname}
paths_df = paths_df.append(df_row, ignore_index=True)
paths_df.fillna(value="Unknown", inplace=True)
# Initialize ensemble objects for each iter found:
Expand All @@ -282,10 +294,11 @@ def add_ensembles_frompath(
pathsforiter,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
self._ensembles[ens.name] = ens

def add_ensembles_fromrunpath(self, runpathfile):
def add_ensembles_fromrunpath(self, runpathfile, batch=None):
"""Add one or many ensembles from an ERT runpath file.
autodiscovery is not an argument, it is by default set to False
Expand All @@ -305,7 +318,10 @@ def add_ensembles_fromrunpath(self, runpathfile):
# Make a runpath slice, and initialize from that:
ens_runpath = runpath_df[runpath_df["iter"] == iterr]
ens = ScratchEnsemble(
"iter-" + str(iterr), runpathfile=ens_runpath, autodiscovery=False
"iter-" + str(iterr),
runpathfile=ens_runpath,
autodiscovery=False,
batch=batch,
)
self._ensembles[ens.name] = ens

Expand Down
1 change: 0 additions & 1 deletion src/fmu/ensemble/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ def __init__(
self.load_txt("parameters.txt")

if batch:
print(batch)
self.process_batch(batch)

logger.info("Initialized %s", abspath)
Expand Down
22 changes: 22 additions & 0 deletions tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,28 @@ def test_reek001(tmp="TMP"):
assert len(reekensemble.keys()) == keycount - 1


def test_batch():
"""Test batch processing at time of object initialization"""
if "__file__" in globals():
# Easen up copying test code into interactive sessions
testdir = os.path.dirname(os.path.abspath(__file__))
else:
testdir = os.path.abspath(".")

ens = ScratchEnsemble(
"reektest",
testdir + "/data/testensemble-reek001/" + "realization-*/iter-0",
batch=[
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
],
)
assert len(ens.get_df("npv.txt")) == 5
assert len(ens.get_df("unsmry--daily")["FOPR"]) == 5490
assert len(ens.get_df("unsmry--yearly")["FOPT"]) == 25


def test_emptyens():
"""Check that we can initialize an empty ensemble"""
ens = ScratchEnsemble("emptyens")
Expand Down
14 changes: 14 additions & 0 deletions tests/test_ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ def rms_vol2df(kwargs):
assert isinstance(ensset4["iter-0"], ScratchEnsemble)
assert isinstance(ensset4["iter-1"], ScratchEnsemble)

# Try the batch command feature:
ensset5 = EnsembleSet(
"reek001",
frompath=ensdir,
batch=[
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
],
)
assert len(ensset5.get_df("npv.txt")) == 10
assert len(ensset5.get_df("unsmry--yearly")) == 50
assert len(ensset5.get_df("unsmry--daily")) == 10980

# Delete the symlink and leftover from apply-testing when we are done.
for real_dir in glob.glob(ensdir + "/realization-*"):
if not SKIP_FMU_TOOLS:
Expand Down

0 comments on commit 6657bc3

Please sign in to comment.