Skip to content

Commit

Permalink
feat: add more detailed error handling in worker execution
Browse files Browse the repository at this point in the history
  • Loading branch information
jkglasbrenner committed Sep 12, 2024
1 parent 1c7535c commit ff8c470
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 13 deletions.
43 changes: 37 additions & 6 deletions src/dioptra/restapi/v1/workflows/lib/run_dioptra_job.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ from structlog.stdlib import BoundLogger

from dioptra.sdk.utilities.contexts import env_vars
from dioptra.sdk.utilities.contexts import sys_path_dirs
from dioptra.task_engine.issues import IssueSeverity
from dioptra.task_engine.task_engine import run_experiment
from dioptra.task_engine.validation import is_valid
from dioptra.task_engine.validation import validate

LOGGER: BoundLogger = structlog.stdlib.get_logger()
DIOPTRA_JOB_ID = "dioptra.jobId"
Expand Down Expand Up @@ -58,12 +59,21 @@ def main(
log.error("Plugins directory does not exist", plugins_dir=str(plugins_dir))
raise ValueError(f"Plugins directory {plugins_dir} does not exist")

job_yaml = _load_job_yaml(JOB_YAML_PATH)
job_parameters = _load_job_params(JOB_PARAMS_JSON_PATH)
try:
job_yaml = _load_job_yaml(JOB_YAML_PATH)

except Exception as e:
log.exception("Could not load job YAML file")
raise e

try:
job_parameters = _load_job_params(JOB_PARAMS_JSON_PATH)

if not is_valid(job_yaml):
log.error("Job YAML was invalid!")
raise ValueError("Job YAML was invalid!")
except Exception as e:
log.exception("Could not load parameters JSON file")
raise e

_validate_yaml(job_yaml, log)

if enable_mlflow_tracking:
if dioptra_client is None:
Expand Down Expand Up @@ -165,6 +175,27 @@ def _load_job_params(filepath: str) -> MutableMapping[str, Any]:
return cast(MutableMapping[str, Any], json.load(f))


def _validate_yaml(
job_yaml: Mapping[str, Any],
logger: BoundLogger | None = None,
) -> None:
log = logger or LOGGER.new(job_id=JOB_ID, experiment_id=EXPERIMENT_ID) # noqa: F841

issues = validate(job_yaml)
errors: list[str] = []

for issue in issues:
if issue.severity is IssueSeverity.WARNING:
log.warn("Found issue with Job YAML", message=issue.message)

if issue.severity is IssueSeverity.ERROR:
log.error("Found error in Job YAML", message=issue.message)
errors.append(issue.message)

if errors:
raise ValueError(f"Errors found in Job YAML: {errors}")


if __name__ == "__main__":
# Running the file as a script does not enable the Mlflow Tracking capability. This
# is useful for debugging whether the job YAML and plugins are set up appropriately.
Expand Down
38 changes: 31 additions & 7 deletions src/dioptra/rq/tasks/run_v1_dioptra_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,41 @@ def run_v1_dioptra_job(job_id: int, experiment_id: int) -> None:
run_dioptra_job_path = working_dir / "run_dioptra_job.py"

# Use client to download the job files for the provided job_id
job_files_package = client.download_job_files(
job_id=job_id, output_dir=working_dir
)
try:
job_files_package = client.download_job_files(
job_id=job_id, output_dir=working_dir
)

except Exception as e:
client.set_job_status(
job_id=job_id, experiment_id=experiment_id, status="failed"
)
log.exception("Could not download job files")
raise e

# Unpack the (trusted) tar.gz file in it.
with tarfile.open(job_files_package, mode="r:*") as tar:
tar.extractall(path=working_dir, filter="data")
try:
with tarfile.open(job_files_package, mode="r:*") as tar:
tar.extractall(path=working_dir, filter="data")

except Exception as e:
client.set_job_status(
job_id=job_id, experiment_id=experiment_id, status="failed"
)
log.exception("Could not extract from tar file")
raise e

# Import the run_dioptra_job.py file as a module
with sys_path_dirs(dirs=(str(working_dir),)):
run_dioptra_job = importlib.import_module(run_dioptra_job_path.stem)
try:
with sys_path_dirs(dirs=(str(working_dir),)):
run_dioptra_job = importlib.import_module(run_dioptra_job_path.stem)

except Exception as e:
client.set_job_status(
job_id=job_id, experiment_id=experiment_id, status="failed"
)
log.exception("Could not import run_dioptra_job.py")
raise e

# Execute the main function in the included script file.
try:
Expand Down

0 comments on commit ff8c470

Please sign in to comment.