Skip to content

Commit

Permalink
Add batch processing function
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Dec 12, 2019
1 parent 99d0a24 commit 5674527
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
43 changes: 42 additions & 1 deletion src/fmu/ensemble/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,15 @@ class ScratchRealization(object):
override anything else.
autodiscovery (boolean): whether the realization should try to
auto-discover certain data (UNSMRY files in standard location)
batch (dict): Dictionary of which functions (load_*) that
should be run at time of initialization. Each key in the
dictionary should be a function name among supported functions,
and each keys value should be the function arguments.
"""

def __init__(self, path, realidxregexp=None, index=None, autodiscovery=True):
def __init__(
self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None
):
self._origpath = os.path.abspath(path)
self.index = None
self._autodiscovery = autodiscovery
Expand Down Expand Up @@ -165,8 +171,43 @@ def __init__(self, path, realidxregexp=None, index=None, autodiscovery=True):
if os.path.exists(os.path.join(abspath, "parameters.txt")):
self.load_txt("parameters.txt")

if batch:
print(batch)
self.process_batch(batch)

logger.info("Initialized %s", abspath)

def process_batch(self, batch):
"""Process a dict of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency
Args:
batch (dict): Keys are function names, value pr key is
a dict with keyword arguments to be supplied to each function.
"""
assert isinstance(batch, dict)
allowed_functions = [
"load_smry",
"load_txt",
"load_csv",
"load_status",
"load_scalar",
]
for cmd in batch:
logger.info(
"Batch processing (#%d): %s with args %s",
self.index,
str(cmd),
str(batch[cmd]),
)
if cmd not in allowed_functions:
logging.warning("process_batch skips not-permitted function " + cmd)
continue
assert isinstance(batch[cmd], dict)
getattr(self, cmd)(**batch[cmd])

def runpath(self):
"""Return the runpath ("root") of the realization
Expand Down
23 changes: 22 additions & 1 deletion tests/test_realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
SKIP_FMU_TOOLS = True

fmux = etc.Interaction()
logger = fmux.basiclogger(__name__, level="WARNING")
logger = fmux.basiclogger(__name__, level="INFO")

if not fmux.testsetup():
raise SystemExit()
Expand Down Expand Up @@ -160,6 +160,27 @@ def test_single_realization():
real.load_csv("bogus.csv")


def test_batch():
"""Test batch processing at time of object initialization"""
if "__file__" in globals():
# Easen up copying test code into interactive sessions
testdir = os.path.dirname(os.path.abspath(__file__))
else:
testdir = os.path.abspath(".")

realdir = os.path.join(testdir, "data/testensemble-reek001", "realization-0/iter-0")
real = ensemble.ScratchRealization(
realdir,
batch={
"load_scalar": {"localpath": "npv.txt"},
"load_smry": {"column_keys": "FOPT", "time_index": "yearly"},
"load_smry": {"column_keys": "*", "time_index": "daily"},
},
)
assert real.get_df("npv.txt") == 3444
assert len(real.get_df("unsmry--yearly")["FOPT"])


def test_volumetric_rates():
"""Test computation of volumetric rates from cumulative vectors"""

Expand Down

0 comments on commit 5674527

Please sign in to comment.