Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Container #116

Merged
merged 2 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- uses: actions/setup-python@v4
with:
python-version: 3.8
- run: python -m pip install python-semantic-release
- run: python -m pip install python-semantic-release==7.34.6
- name: Figure version
id: last_tag
run: |
Expand Down
7 changes: 6 additions & 1 deletion magnus/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,10 @@ def create_node(name: str, step_config: dict, internal_branch_name: str = None):
internal_name = internal_branch_name + "." + name

try:
node_type = step_config.pop("type") # Remove the type as it is not used in node creation.
node_mgr = driver.DriverManager(
namespace="nodes",
name=step_config["type"],
name=node_type,
invoke_on_load=True,
invoke_kwds={
"name": name,
Expand All @@ -394,6 +395,10 @@ def create_node(name: str, step_config: dict, internal_branch_name: str = None):
},
)
return node_mgr.driver
except KeyError:
msg = "The node configuration does not contain the required key 'type'."
logger.exception(step_config)
raise Exception(msg)
except Exception as _e:
msg = (
f"Could not find the node type {step_config['type']}. Please ensure you have installed "
Expand Down
54 changes: 19 additions & 35 deletions magnus/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
import logging
import multiprocessing
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime
from typing import List, Optional, Union

from pydantic import BaseModel, Extra
from stevedore import driver

import magnus
from magnus import defaults, utils
from magnus.datastore import StepAttempt
from magnus.graph import create_graph
from magnus.tasks import create_task

logger = logging.getLogger(defaults.NAME)

Expand Down Expand Up @@ -41,6 +42,9 @@ class BaseNode:
errors_on: List[str] = []

class Config(BaseModel):
class Config:
extra = Extra.forbid

executor_config: dict = {}

def __init__(self, *args, **kwargs):
Expand All @@ -55,7 +59,7 @@ def __init__(self, name, internal_name, config, internal_branch_name=None):
# pylint: disable=R0914,R0913
self.name = name
self.internal_name = internal_name #  Dot notation naming of the steps
self.config = self.Config(**config)
self.config = self.Config(**config) # Will hold the config as it comes in
self.internal_branch_name = internal_branch_name # parallel, map, dag only have internal names
self.is_composite = False

Expand Down Expand Up @@ -394,42 +398,25 @@ class TaskNode(BaseNode):
errors_on = ["branches"]

class Config(BaseNode.Config):
next_node: str
command: str
command_type: str = defaults.COMMAND_TYPE
command_config: dict = {}
image: str = ""
next_node: str
catalog: dict = {}
retry: int = 1
on_failure: str = ""
command_config: dict = {}

def __init__(self, name, internal_name, config, internal_branch_name=None):
super().__init__(name, internal_name, config, internal_branch_name)

task_type = self.config.command_type
logger.info(f"Trying to get a task of type {task_type}")
try:
task_mgr = driver.DriverManager(namespace="tasks", name=task_type, invoke_on_load=False)
except Exception as _e:
msg = (
f"Could not find the task type {task_type}. Please ensure you have installed the extension that"
" provides the task type. "
"\nCore supports: python(default), python-lambda, shell, notebook. python-function"
)
raise Exception(msg) from _e
self.task = task_mgr.driver

def _to_dict(self) -> dict:
"""
The dict representation of the node.

Returns:
dict: The dict representation of the node.
"""
config_dict = dict(self.config.dict())
config_dict["type"] = self.node_type
config_dict["command_config"] = self.config.command_config
config_dict["command_type"] = self.task.task_type
return config_dict
self.executable = create_task(
node_name=self.name,
command=self.config.command,
image=self.config.image,
command_type=self.config.command_type,
command_config=self.config.command_config,
)

def execute(self, executor, mock=False, map_variable: dict = None, **kwargs) -> StepAttempt:
"""
Expand All @@ -450,10 +437,7 @@ def execute(self, executor, mock=False, map_variable: dict = None, **kwargs) ->
attempt_log.status = defaults.SUCCESS
if not mock:
# Do not run if we are mocking the execution, could be useful for caching and dry runs
command_config = {"command": self.config.command}
command_config.update(self.config.command_config)
task = self.task(config=command_config)
task.execute_command(map_variable=map_variable)
self.executable.execute_command(map_variable=map_variable)
except Exception as _e: # pylint: disable=W0703
logger.exception("Task failed")
attempt_log.status = defaults.FAIL
Expand Down Expand Up @@ -684,7 +668,7 @@ def get_sub_graphs(self):
branches = {}
for branch_name, branch_config in self.config.branches.items():
sub_graph = create_graph(
branch_config,
deepcopy(branch_config),
internal_branch_name=self.internal_name + "." + branch_name,
)
branches[self.internal_name + "." + branch_name] = sub_graph
Expand Down Expand Up @@ -892,7 +876,7 @@ def get_sub_graph(self):

branch_config = self.config.branch
branch = create_graph(
branch_config,
deepcopy(branch_config),
internal_branch_name=self.internal_name + "." + self.branch_placeholder_name,
)
return branch
Expand Down
1 change: 1 addition & 0 deletions magnus/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def construct(self, steps: List[Task]):
messages: List[str] = []
for step in steps:
step._construct_node()
print(step.node.__dict__)
messages.extend(step.node.validate()) # type: ignore

if not steps:
Expand Down
Loading