Skip to content

Commit

Permalink
Container (#119)
Browse files Browse the repository at this point in the history
* feat: Improving bash functionality, map catalog being specific to iterable

* fix: release process PSR fix to 7.x.x

* fix: release process PSR fix to 7.x.x

* feat: Working container task type

* feat: Adding required accomodations for local container

* feat: accommodating executor

* feat: working container in local with data and parameters synced

* feat: Releasing container task type
  • Loading branch information
vijayvammi authored Aug 5, 2023
1 parent ae9ba36 commit ce2a6dc
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 91 deletions.
83 changes: 64 additions & 19 deletions magnus/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import logging
import os
import re
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, cast

from pydantic import BaseModel

from magnus import defaults, exceptions, integration, interaction, utils
from magnus.catalog import BaseCatalog
from magnus.graph import Graph
from magnus.nodes import BaseNode

if TYPE_CHECKING:
from magnus.catalog import BaseCatalog
from magnus.datastore import BaseRunLogStore, DataCatalog, StepLog
from magnus.experiment_tracker import BaseExperimentTracker
from magnus.secrets import BaseSecrets
Expand Down Expand Up @@ -63,13 +63,14 @@ def __init__(self, config: dict = None):
self.dag_hash: str = ""
self.execution_plan: str = "" # Chained or unchained
# Services
self.catalog_handler: BaseCatalog = None # type: ignore
self.catalog_handler: Optional[BaseCatalog] = None
self.secrets_handler: BaseSecrets = None # type: ignore
self.experiment_tracker: BaseExperimentTracker = None # type: ignore
self.run_log_store: BaseRunLogStore = None # type: ignore
self.previous_run_log = None

self.context_step_log: Optional[StepLog] = None
self.context_node: Optional[BaseNode] = None

@property
def step_decorator_run_id(self):
Expand Down Expand Up @@ -192,7 +193,9 @@ def prepare_for_node_execution(self):
integration.validate(self, self.experiment_tracker)
integration.configure_for_execution(self, self.experiment_tracker)

def _sync_catalog(self, node: BaseNode, step_log: StepLog, stage: str, synced_catalogs=None):
def _sync_catalog(
self, node: BaseNode, step_log: StepLog, stage: str, synced_catalogs=None
) -> Optional[List[DataCatalog]]:
"""
1). Identify the catalog settings by over-riding node settings with the global settings.
2). For stage = get:
Expand All @@ -208,6 +211,9 @@ def _sync_catalog(self, node: BaseNode, step_log: StepLog, stage: str, synced_ca
step_log (StepLog): The step log corresponding to that node
stage (str): One of get or put
Raises:
Exception: If the stage is not in one of get/put
"""
if stage not in ["get", "put"]:
msg = (
Expand All @@ -216,18 +222,18 @@ def _sync_catalog(self, node: BaseNode, step_log: StepLog, stage: str, synced_ca
)
raise Exception(msg)

node_catalog_settings = node._get_catalog_settings()
node_catalog_settings = cast(BaseNode, self.context_node)._get_catalog_settings()
if not (node_catalog_settings and stage in node_catalog_settings):
# Nothing to get/put from the catalog
return None

# Local compute data folder over rides the global one
compute_data_folder = self.catalog_handler.compute_data_folder
if "compute_data_folder" in node_catalog_settings and node_catalog_settings["compute_data_folder"]:
compute_data_folder = node_catalog_settings["compute_data_folder"]
compute_data_folder = self.get_effective_compute_data_folder()

if not compute_data_folder:
return None

data_catalogs = []
for name_pattern in node_catalog_settings.get(stage) or []: #  Assumes a list
for name_pattern in cast(dict, node_catalog_settings).get(stage) or []: #  Assumes a list
data_catalogs = getattr(self.catalog_handler, stage)(
name=name_pattern,
run_id=self.run_id,
Expand All @@ -240,6 +246,29 @@ def _sync_catalog(self, node: BaseNode, step_log: StepLog, stage: str, synced_ca

return data_catalogs

def get_effective_compute_data_folder(self) -> Optional[str]:
"""
Get the effective compute data folder for the given stage.
If there is nothing to catalog, we return None.
The default is the compute data folder of the catalog but this can be over-ridden by the node.
Args:
stage (str): The stage we are in the process of cataloging
Returns:
Optional[str]: The compute data folder as defined by catalog handler or the node or None.
"""

catalog_settings = cast(BaseNode, self.context_node)._get_catalog_settings()

compute_data_folder = cast(BaseCatalog, self.catalog_handler).compute_data_folder
if "compute_data_folder" in catalog_settings and catalog_settings["compute_data_folder"]: # type: ignore
compute_data_folder = catalog_settings["compute_data_folder"] # type: ignore

return compute_data_folder

@property
def step_attempt_number(self) -> int:
"""
Expand Down Expand Up @@ -284,10 +313,13 @@ def _execute_node(self, node: BaseNode, map_variable: dict = None, **kwargs):
max_attempts = max_attempts = node._get_max_attempts()
logger.info(f"Trying to execute node: {node.internal_name}, attempt : {attempt}, max_attempts: {max_attempts}")

data_catalogs_get: List[DataCatalog] = self._sync_catalog(node, step_log, stage="get")
attempt_log = self.run_log_store.create_attempt_log()
try:
self.context_step_log = step_log
self.context_node = node

attempt_log = self.run_log_store.create_attempt_log()
data_catalogs_get: Optional[List[DataCatalog]] = self._sync_catalog(node, step_log, stage="get")

attempt_log = node.execute(executor=self, mock=step_log.mock, map_variable=map_variable, **kwargs)
except Exception as e:
# Any exception here is a magnus exception as node suppresses exceptions.
Expand All @@ -311,8 +343,9 @@ def _execute_node(self, node: BaseNode, map_variable: dict = None, **kwargs):
diff_parameters = utils.diff_dict(parameters_in, parameters_out)
self.run_log_store.set_parameters(self.run_id, diff_parameters)

# Remove the step log context
# Remove the step context
self.context_step_log = None
self.context_node = None

self.run_log_store.add_step_log(step_log, self.run_id)

Expand Down Expand Up @@ -619,7 +652,6 @@ def _resolve_executor_config(self, node: BaseNode):
)

effective_node_config[key] = value

effective_node_config.pop("placeholders", None)

return effective_node_config
Expand Down Expand Up @@ -810,7 +842,7 @@ def docker_image(self) -> str:
Returns:
str: The default docker image to use from the config.
"""
return self.config.docker_image
return self.config.docker_image # type: ignore

def add_code_identities(self, node: BaseNode, step_log: StepLog, **kwargs):
"""
Expand Down Expand Up @@ -882,8 +914,15 @@ def trigger_job(self, node: BaseNode, map_variable: dict = None, **kwargs):
logger.debug("Here is the resolved executor config")
logger.debug(executor_config)

if "run_in_local" in executor_config and executor_config["run_in_local"]:
from magnus.nodes import TaskNode
from magnus.tasks import ContainerTaskType

if executor_config.get("run_in_local", None) or (
cast(TaskNode, node).executable.task_type == ContainerTaskType.task_type
):
# Do not change config but only validate the configuration.
# Trigger the job on local system instead of a container
# Or if the task type is a container, just spin the container.
integration.validate(self, self.run_log_store)
integration.validate(self, self.catalog_handler)
integration.validate(self, self.secrets_handler)
Expand Down Expand Up @@ -919,6 +958,7 @@ def _spin_container(

try:
client = docker.from_env()
api_client = docker.APIClient()
except Exception as ex:
logger.exception("Could not get access to docker")
raise Exception("Could not get the docker socket file, do you have docker installed?") from ex
Expand All @@ -939,13 +979,13 @@ def _spin_container(
container = client.containers.create(
image=docker_image,
command=command,
auto_remove=True,
auto_remove=False,
volumes=self.volumes,
network_mode="host",
environment=environment,
)
container.start()
stream = container.logs(stream=True, follow=True)
stream = api_client.logs(container=container.id, timestamps=True, stream=True, follow=True)
while True:
try:
output = next(stream).decode("utf-8")
Expand All @@ -954,9 +994,14 @@ def _spin_container(
except StopIteration:
logger.info("Docker Run completed")
break
exit_status = api_client.inspect_container(container.id)["State"]["ExitCode"]
container.remove(force=True)
if exit_status != 0:
msg = f"Docker command failed with exit code {exit_status}"
raise Exception(msg)

except Exception as _e:
logger.exception("Problems with spinning up the container")
logger.exception("Problems with spinning/running the container")
raise _e


Expand Down
8 changes: 5 additions & 3 deletions magnus/interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
from pathlib import Path
from typing import Any, Union
from typing import Any, Union, cast

from magnus import defaults, exceptions, pickler, pipeline, utils

Expand Down Expand Up @@ -132,6 +132,7 @@ def get_from_catalog(name: str, destination_folder: str = None):
"""
from magnus import context # pylint: disable=import-outside-toplevel
from magnus.catalog import BaseCatalog

if not context.executor:
msg = (
Expand All @@ -143,7 +144,7 @@ def get_from_catalog(name: str, destination_folder: str = None):
if not destination_folder:
destination_folder = context.executor.catalog_handler.compute_data_folder # type: ignore

data_catalog = context.executor.catalog_handler.get(
data_catalog = cast(BaseCatalog, context.executor.catalog_handler).get(
name,
run_id=context.executor.run_id, # type: ignore
compute_data_folder=destination_folder,
Expand All @@ -168,6 +169,7 @@ def put_in_catalog(filepath: str):
filepath (str): The path of the file to put in the catalog
"""
from magnus import context # pylint: disable=import-outside-toplevel
from magnus.catalog import BaseCatalog

if not context.executor:
msg = (
Expand All @@ -178,7 +180,7 @@ def put_in_catalog(filepath: str):

file_path = Path(filepath)

data_catalog = context.executor.catalog_handler.put(
data_catalog = cast(BaseCatalog, context.executor.catalog_handler).put(
file_path.name,
run_id=context.executor.run_id, # type: ignore
compute_data_folder=file_path.parent,
Expand Down
55 changes: 20 additions & 35 deletions magnus/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ class BaseNode:
"""

node_type = ""
required_fields: List[str] = []
errors_on: List[str] = []

class Config(BaseModel):
class Config:
Expand All @@ -59,7 +57,7 @@ def __init__(self, name, internal_name, config, internal_branch_name=None):
# pylint: disable=R0914,R0913
self.name = name
self.internal_name = internal_name #  Dot notation naming of the steps
self.config = self.Config(**config) # Will hold the config as it comes in
self.config = self.Config(**config)
self.internal_branch_name = internal_branch_name # parallel, map, dag only have internal names
self.is_composite = False

Expand All @@ -74,14 +72,6 @@ def validate(self) -> List[str]:
if "%" in self.name:
messages.append("Node names cannot have '%' in them")

for req in self.required_fields:
if req not in self.config.dict():
messages.append(f"{self.name} should have {req} field")
continue

for err in self.errors_on:
if err in self.config.dict():
messages.append(f"{self.name} should not have {err} field")
return messages

def _to_dict(self) -> dict:
Expand Down Expand Up @@ -394,29 +384,34 @@ class TaskNode(BaseNode):
"""

node_type = "task"
required_fields = ["next_node", "command"]
errors_on = ["branches"]

class Config(BaseNode.Config):
command: str
command_type: str = defaults.COMMAND_TYPE
image: str = ""
class Config(BaseNode.Config, extra=Extra.allow): # type: ignore
next_node: str
catalog: dict = {}
retry: int = 1
on_failure: str = ""
command_config: dict = {}

@classmethod
def get_field_names(cls) -> List[str]:
field_names = []
for k, _ in cls.__fields__.items():
field_names.append(k)

return field_names

def __init__(self, name, internal_name, config, internal_branch_name=None):
super().__init__(name, internal_name, config, internal_branch_name)

self.executable = create_task(
node_name=self.name,
command=self.config.command,
image=self.config.image,
command_type=self.config.command_type,
command_config=self.config.command_config,
)
kwargs_for_command = {
"node_name": self.name,
}

for key, value in self.config.dict().items():
if key not in TaskNode.Config.get_field_names():
# Ignore all the fields that are used by node itself
kwargs_for_command[key] = value

self.executable = create_task(kwargs_for_command)

def execute(self, executor, mock=False, map_variable: dict = None, **kwargs) -> StepAttempt:
"""
Expand Down Expand Up @@ -468,8 +463,6 @@ class FailNode(BaseNode):
"""

node_type = "fail"
required_fields: List[str] = []
errors_on: List[str] = ["next_node", "command", "branches", "on_failure", "catalog"]

def _get_on_failure_node(self) -> Optional[str]:
"""
Expand Down Expand Up @@ -551,8 +544,6 @@ class SuccessNode(BaseNode):
"""

node_type = "success"
required_fields: List[str] = []
errors_on: List[str] = ["next_node", "command", "branches", "on_failure", "catalog"]

def _get_on_failure_node(self) -> Optional[str]:
"""
Expand Down Expand Up @@ -642,8 +633,6 @@ class ParallelNode(BaseNode):
"""

node_type = "parallel"
required_fields = ["next_node", "branches"]
errors_on = ["command", "catalog", "retry"]

class Config(BaseNode.Config):
next_node: str
Expand Down Expand Up @@ -823,8 +812,6 @@ class MapNode(BaseNode):
"""

node_type = "map"
required_fields = ["next_node", "iterate_on", "iterate_as", "branch"]
errors_on = ["command", "catalog", "retry"]

class Config(BaseNode.Config):
next_node: str
Expand Down Expand Up @@ -1049,8 +1036,6 @@ class DagNode(BaseNode):
"""

node_type = "dag"
required_fields = ["next_node", "dag_definition"]
errors_on = ["command", "catalog", "retry"]

class Config(BaseNode.Config):
next_node: str
Expand Down
Loading

0 comments on commit ce2a6dc

Please sign in to comment.