Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: AiiDA generator #1995

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from .platforms.paramiko_submitter import ParamikoSubmitter
from .platforms.platform import Platform
from .migrate.migrate import Migrate
from .generators import Engine, get_engine_generator

dialog = None
from time import sleep
Expand Down Expand Up @@ -87,7 +88,6 @@
import autosubmit.helpers.autosubmit_helper as AutosubmitHelper
import autosubmit.statistics.utils as StatisticsUtils
from autosubmit.helpers.utils import proccess_id, terminate_child_process, check_jobs_file_exists

from contextlib import suppress

"""
Expand Down Expand Up @@ -684,6 +684,18 @@ def parse_args():
help='Select the status (one or more) to filter the list of jobs.')
subparser.add_argument('-t', '--target', type=str, default="FAILED", metavar='STATUS',
help='Final status of killed jobs. Default is FAILED.')

# Generate
subparser = subparsers.add_parser(
'generate', description='Generate a workflow definition for a different workflow engine',
argument_default=argparse.SUPPRESS)
subparser.add_argument('expid', help='experiment identifier')
subsubparser = subparser.add_subparsers(title="engines", dest='engine', required=True, description='Workflow engine identifier')
for engine in Engine:
generator_class = get_engine_generator(engine)
parser_engine = subsubparser.add_parser(engine.value, help=f"{generator_class.get_engine_name()}")
generator_class.add_parse_args(parser_engine)

args, unknown = parser.parse_known_args()
if args.version:
Log.info(Autosubmit.autosubmit_version)
Expand All @@ -694,6 +706,7 @@ def parse_args():
except SystemExit as e:
return 1
except BaseException as e:
raise e
raise AutosubmitCritical(
"Incorrect arguments for this command", 7011)

Expand Down Expand Up @@ -790,6 +803,9 @@ def parse_args():
return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect)
elif args.command == 'stop':
return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target)
elif args.command == 'generate':
return Autosubmit.generate_workflow(args.expid, Engine[args.engine], args)

@staticmethod
def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'):
Log.set_console_level(console_level)
Expand Down Expand Up @@ -6104,5 +6120,47 @@ def retrieve_expids():
terminate_child_process(expid)


@staticmethod
def generate_workflow(expid: str, engine: Engine, args: argparse.Namespace) -> None:
"""Generate the workflow configuration for a different backend engine."""
Log.info(f'Generate workflow configuration for {engine}')

# TODO check the code below, if it makes sense, this I have not touched from original MR
try:
Log.info("Getting job list...")
as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory())
as_conf.check_conf_files(False)

submitter = Autosubmit._get_submitter(as_conf)
submitter.load_platforms(as_conf)
if len(submitter.platforms) == 0:
raise ValueError('Missing platform!')

packages_persistence = JobPackagePersistence(
os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid)
job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=False, monitor=False)

Autosubmit._load_parameters(as_conf, job_list, submitter.platforms)

hpc_architecture = as_conf.get_platform()
for job in job_list.get_job_list():
if job.platform_name is None or job.platform_name == '':
job.platform_name = hpc_architecture
job.platform = submitter.platforms[job.platform_name]
job.update_parameters(as_conf, job_list.parameters)

job_list.check_scripts(as_conf)
except AutosubmitError as e:
raise AutosubmitCritical(e.message, e.code, e.trace)
except AutosubmitCritical as e:
raise
except BaseException as e:
raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040,
str(e))

generator_class = get_engine_generator(engine)
parser = argparse.ArgumentParser()
generator_class.add_parse_args(parser)
generator_input_keys = vars(parser.parse_args('')).keys()
generator_kwargs = {key: args.__getattribute__(key) for key in generator_input_keys}
generator_class.generate(job_list, as_conf, **generator_kwargs)
47 changes: 47 additions & 0 deletions autosubmit/generators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from enum import Enum
from importlib import import_module
from typing import AbstractSet, Callable, cast
from abc import ABC, abstractmethod


"""This module provides generators to produce workflow configurations for different backend engines."""

class Engine(Enum):
"""Workflow Manager engine flavors."""
aiida = 'aiida'

def __str__(self):
return self.value


# TODO COMMENT: we can make this alse a protocol, but I don't see the reason here for that
class AbstractGenerator(ABC):
"""Generator of workflow for an engine."""

@staticmethod
@abstractmethod
def get_engine_name() -> str:
"""The engine name used for the help text."""
raise NotImplementedError

@staticmethod
@abstractmethod
def add_parse_args(parser) -> None:
"""Adds arguments to the parser that are needed for a specific engine implementation."""
raise NotImplementedError

# TODO COMMENT: This could be also a __init__ plus method, but I thought one method is easier, then the implementation can do whatever
@classmethod
@abstractmethod
def generate(cls, job_list, as_conf, **arg_options) -> None:
"""Generates the workflow from the created autosubmit workflow."""
raise NotImplementedError


def get_engine_generator(engine: Engine) -> AbstractGenerator:
return import_module(f'autosubmit.generators.{engine.value}').Generator

__all__ = [
'Engine',
'get_engine_generator'
]
Loading