Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing MPIEvaluator: Run on multi-node HPC systems using mpi4py #299

Merged
merged 5 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
include:
- os: ubuntu-latest
python-version: "3.10"
test-mpi: true
- os: ubuntu-latest
python-version: "3.9"
- os: ubuntu-latest
Expand All @@ -42,6 +43,12 @@ jobs:
run: |
pip install --upgrade pip
pip install .[dev,cov] ${{ matrix.pip-pre }}
- name: Install MPI and mpi4py
if: matrix.test-mpi == true
run: |
sudo apt-get update
sudo apt-get install -y libopenmpi-dev
pip install mpi4py
- name: Test with Pytest
timeout-minutes: 15
run:
Expand Down
1 change: 1 addition & 0 deletions ema_workbench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Constant,
Scenario,
Policy,
MPIEvaluator,
MultiprocessingEvaluator,
IpyparallelEvaluator,
SequentialEvaluator,
Expand Down
2 changes: 2 additions & 0 deletions ema_workbench/em_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"perform_experiments",
"optimize",
"IpyparallelEvaluator",
"MPIEvaluator",
"MultiprocessingEvaluator",
"SequentialEvaluator",
"ReplicatorModel",
Expand Down Expand Up @@ -76,6 +77,7 @@
from .evaluators import (
perform_experiments,
optimize,
MPIEvaluator,
MultiprocessingEvaluator,
SequentialEvaluator,
Samplers,
Expand Down
66 changes: 66 additions & 0 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import threading
import warnings
import logging

from ema_workbench.em_framework.samplers import AbstractSampler
from .callbacks import DefaultCallback
Expand Down Expand Up @@ -415,6 +416,71 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
add_tasks(self.n_processes, self._pool, ex_gen, callback)


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""

def __init__(self, msis, n_processes=None, **kwargs):
super().__init__(msis, **kwargs)
warnings.warn(
"The MPIEvaluator is experimental. Its interface and functionality might change in future releases.\n"
"We welcome your feedback at: https://github.com/quaquel/EMAworkbench/discussions/311",
FutureWarning,
)
self._pool = None
self.n_processes = n_processes

def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
_logger.warning(
f"With only a few workers ({self._pool._max_workers}), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator"
)
return self

def finalize(self):
self._pool.shutdown()
_logger.info("MPI pool has been shut down")

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)

packed = [(experiment, experiment.model_name, self._msis) for experiment in experiments]

_logger.info(
f"MPIEvaluator: Starting {len(packed)} experiments using MPI pool with {self._pool._max_workers} workers"
)
results = self._pool.map(run_experiment_mpi, packed)

_logger.info(f"MPIEvaluator: Completed all {len(packed)} experiments")
for experiment, outcomes in results:
callback(experiment, outcomes)
_logger.info(f"MPIEvaluator: Callback completed for all {len(packed)} experiments")


def run_experiment_mpi(packed_data):
from mpi4py.MPI import COMM_WORLD

rank = COMM_WORLD.Get_rank()

experiment, model_name, msis = packed_data
_logger.debug(f"MPI Rank {rank}: starting {repr(experiment)}")

models = NamedObjectMap(AbstractModel)
models.extend(msis)
experiment_runner = ExperimentRunner(models)

outcomes = experiment_runner.run_experiment(experiment)

_logger.debug(f"MPI Rank {rank}: completed {experiment}")

return experiment, outcomes


class IpyparallelEvaluator(BaseEvaluator):
"""evaluator for using an ipypparallel pool"""

Expand Down
10 changes: 9 additions & 1 deletion ema_workbench/util/ema_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,18 @@ def get_rootlogger():
return _rootlogger


def log_to_stderr(level=None):
def log_to_stderr(level=None, pass_root_logger_level=False):
"""
Turn on logging and add a handler which prints to stderr
EwoutH marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
level : int
minimum level of the messages that will be logged
pas_root_logger_level: bool, optional. Default False
if true, all module loggers will be set to the
same logging level as the root logger.
Recommended True when using the MPIEvaluator.

"""

Expand All @@ -206,4 +210,8 @@ def log_to_stderr(level=None):
logger.addHandler(handler)
logger.propagate = False

if pass_root_logger_level:
for _, mod_logger in _module_loggers.items():
mod_logger.setLevel(level)

return logger
46 changes: 46 additions & 0 deletions test/test_em_framework/test_evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import unittest.mock as mock
import unittest
import platform

import ema_workbench
from ema_workbench.em_framework import evaluators
Expand Down Expand Up @@ -74,6 +75,51 @@ def test_ipyparallel_evaluator(
evaluator.evaluate_experiments(10, 10, mocked_callback)
lb_view.map.called_once()

# Check if mpi4py is installed and if we're on a Linux environment
try:
import mpi4py

MPI_AVAILABLE = True
except ImportError:
MPI_AVAILABLE = False
IS_LINUX = platform.system() == "Linux"

@unittest.skipUnless(
MPI_AVAILABLE and IS_LINUX, "Test requires mpi4py installed and a Linux environment"
)
@mock.patch("mpi4py.futures.MPIPoolExecutor")
@mock.patch("ema_workbench.em_framework.evaluators.DefaultCallback")
@mock.patch("ema_workbench.em_framework.evaluators.experiment_generator")
def test_mpi_evaluator(self, mocked_generator, mocked_callback, mocked_MPIPoolExecutor):
try:
import mpi4py
except ImportError:
self.fail(
"mpi4py is not installed. It's required for this test. Install with: pip install mpi4py"
)

model = mock.Mock(spec=ema_workbench.Model)
model.name = "test"

# Create a mock experiment with the required attribute
mock_experiment = mock.Mock()
mock_experiment.model_name = "test"
mocked_generator.return_value = [mock_experiment]

pool_mock = mock.Mock()
pool_mock.map.return_value = [(1, ({}, {}))]
pool_mock._max_workers = 5 # Arbitrary number
mocked_MPIPoolExecutor.return_value = pool_mock

with evaluators.MPIEvaluator(model) as evaluator:
evaluator.evaluate_experiments(10, 10, mocked_callback)

mocked_MPIPoolExecutor.assert_called_once()
pool_mock.map.assert_called_once()

# Check that pool shutdown was called
pool_mock.shutdown.assert_called_once()

def test_perform_experiments(self):
pass

Expand Down