From 2484acfbe3c447cce24874238b378007c288cd7a Mon Sep 17 00:00:00 2001 From: Vijay Vammi Date: Sun, 19 May 2024 06:13:13 +0100 Subject: [PATCH] fix: bug with env secrets, improved capture of std --- examples/02-sequential/default_fail.py | 4 +- examples/configs/argo-config.yaml | 7 --- pyproject.toml | 1 + runnable/__init__.py | 2 + runnable/defaults.py | 2 +- runnable/entrypoints.py | 13 +++-- runnable/extensions/executor/__init__.py | 10 ++++ .../local_container/implementation.py | 55 ++++++++++++++++++- runnable/tasks.py | 47 ++++++++-------- 9 files changed, 100 insertions(+), 41 deletions(-) diff --git a/examples/02-sequential/default_fail.py b/examples/02-sequential/default_fail.py index c504e36d..6c7f8512 100644 --- a/examples/02-sequential/default_fail.py +++ b/examples/02-sequential/default_fail.py @@ -12,12 +12,12 @@ python examples/02-sequential/default_fail.py """ -from examples.common.functions import raise_ex +from examples.common.functions import hello, raise_ex from runnable import Pipeline, PythonTask, Stub def main(): - step1 = Stub(name="step 1") + step1 = PythonTask(name="step 1", function=hello) step2 = PythonTask(name="step 2", function=raise_ex) # This will fail diff --git a/examples/configs/argo-config.yaml b/examples/configs/argo-config.yaml index 36bdbbf9..e697f0d8 100644 --- a/examples/configs/argo-config.yaml +++ b/examples/configs/argo-config.yaml @@ -6,10 +6,6 @@ executor: persistent_volumes: # (3) - name: magnus-volume mount_path: /mnt - secrets_from_k8s: - - environment_variable: AZURE_CLIENT_ID - secret_name: ms-graph - secret_key: AZURE_CLIENT_ID run_log_store: # (4) type: chunked-fs @@ -20,6 +16,3 @@ catalog: type: file-system config: catalog_location: /mnt/catalog - -# secrets: -# type: do-nothing diff --git a/pyproject.toml b/pyproject.toml index 481aa097..8e28360d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ runnable = 'runnable.cli:cli' [tool.poetry.plugins."secrets"] "do-nothing" = "runnable.secrets:DoNothingSecretManager" "dotenv" = "runnable.extensions.secrets.dotenv.implementation:DotEnvSecrets" +"env-secrets" = "runnable.secrets:EnvSecretsManager" # Plugins for Run Log store [tool.poetry.plugins."run_log_store"] diff --git a/runnable/__init__.py b/runnable/__init__.py index fec5e87d..1f789315 100644 --- a/runnable/__init__.py +++ b/runnable/__init__.py @@ -15,6 +15,8 @@ console = Console(record=True) console.print(":runner: Lets go!!") +task_console = Console(record=True) + from runnable.sdk import ( # noqa Catalog, Fail, diff --git a/runnable/defaults.py b/runnable/defaults.py index e3efabcf..7d3ad3f7 100644 --- a/runnable/defaults.py +++ b/runnable/defaults.py @@ -77,7 +77,7 @@ class RunnableConfig(TypedDict, total=False): DEFAULT_EXECUTOR = ServiceConfig(type="local", config={}) DEFAULT_RUN_LOG_STORE = ServiceConfig(type="file-system", config={}) DEFAULT_CATALOG = ServiceConfig(type="file-system", config={}) -DEFAULT_SECRETS = ServiceConfig(type="do-nothing", config={}) +DEFAULT_SECRETS = ServiceConfig(type="env-secrets", config={}) DEFAULT_EXPERIMENT_TRACKER = ServiceConfig(type="do-nothing", config={}) DEFAULT_PICKLER = ServiceConfig(type="pickle", config={}) diff --git a/runnable/entrypoints.py b/runnable/entrypoints.py index efbc8add..303c8269 100644 --- a/runnable/entrypoints.py +++ b/runnable/entrypoints.py @@ -9,7 +9,7 @@ from rich.table import Column import runnable.context as context -from runnable import console, defaults, graph, utils +from runnable import console, defaults, graph, task_console, utils from runnable.defaults import RunnableConfig, ServiceConfig logger = logging.getLogger(defaults.LOGGER_NAME) @@ -165,6 +165,7 @@ def execute( tag=tag, parameters_file=parameters_file, ) + console.print("Working with context:") console.print(run_context) console.rule(style="[dark orange]") @@ -239,7 +240,7 @@ def execute_single_node( """ from runnable import nodes - console.print(f"Executing the single node: {step_name} with map variable: {map_variable}") + task_console.print(f"Executing the single node: {step_name} with map variable: {map_variable}") configuration_file = os.environ.get("RUNNABLE_CONFIGURATION_FILE", configuration_file) @@ -250,9 +251,9 @@ def execute_single_node( tag=tag, parameters_file=parameters_file, ) - console.print("Working with context:") - console.print(run_context) - console.rule(style="[dark orange]") + task_console.print("Working with context:") + task_console.print(run_context) + task_console.rule(style="[dark orange]") executor = run_context.executor run_context.execution_plan = defaults.EXECUTION_PLAN.CHAINED.value @@ -281,7 +282,7 @@ def execute_single_node( node=node_to_execute, map_variable=map_variable_dict, ) - console.save_text(log_file_name) + task_console.save_text(log_file_name) # Put the log file in the catalog run_context.catalog_handler.put(name=log_file_name, run_id=run_context.run_id) diff --git a/runnable/extensions/executor/__init__.py b/runnable/extensions/executor/__init__.py index 4353c9e3..a696dc4f 100644 --- a/runnable/extensions/executor/__init__.py +++ b/runnable/extensions/executor/__init__.py @@ -11,6 +11,7 @@ exceptions, integration, parameters, + task_console, utils, ) from runnable.datastore import DataCatalog, JsonParameter, RunLog, StepLog @@ -340,10 +341,18 @@ def execute_from_graph(self, node: BaseNode, map_variable: TypeMapVariable = Non node.execute_as_graph(map_variable=map_variable, **kwargs) return + task_console.export_text(clear=True) + task_name = node._resolve_map_placeholders(node.internal_name, map_variable) console.print(f":runner: Executing the node {task_name} ... ", style="bold color(208)") self.trigger_job(node=node, map_variable=map_variable, **kwargs) + log_file_name = utils.make_log_file_name(node=node, map_variable=map_variable) + task_console.save_text(log_file_name, clear=True) + + self._context.catalog_handler.put(name=log_file_name, run_id=self._context.run_id) + os.remove(log_file_name) + def trigger_job(self, node: BaseNode, map_variable: TypeMapVariable = None, **kwargs): """ Call this method only if we are responsible for traversing the graph via @@ -493,6 +502,7 @@ def execute_graph(self, dag: Graph, map_variable: TypeMapVariable = None, **kwar logger.info(f"Finished execution of the {branch} with status {run_log.status}") + # We are in the root dag if dag == self._context.dag: run_log = cast(RunLog, run_log) console.print("Completed Execution, Summary:", style="bold color(208)") diff --git a/runnable/extensions/executor/local_container/implementation.py b/runnable/extensions/executor/local_container/implementation.py index 9f826132..5d3fab06 100644 --- a/runnable/extensions/executor/local_container/implementation.py +++ b/runnable/extensions/executor/local_container/implementation.py @@ -5,7 +5,7 @@ from pydantic import Field from rich import print -from runnable import defaults, utils +from runnable import console, defaults, task_console, utils from runnable.datastore import StepLog from runnable.defaults import TypeMapVariable from runnable.extensions.executor import GenericExecutor @@ -96,6 +96,59 @@ def execute_node(self, node: BaseNode, map_variable: TypeMapVariable = None, **k """ return self._execute_node(node, map_variable, **kwargs) + def execute_from_graph(self, node: BaseNode, map_variable: TypeMapVariable = None, **kwargs): + """ + This is the entry point to from the graph execution. + + While the self.execute_graph is responsible for traversing the graph, this function is responsible for + actual execution of the node. + + If the node type is: + * task : We can delegate to _execute_node after checking the eligibility for re-run in cases of a re-run + * success: We can delegate to _execute_node + * fail: We can delegate to _execute_node + + For nodes that are internally graphs: + * parallel: Delegate the responsibility of execution to the node.execute_as_graph() + * dag: Delegate the responsibility of execution to the node.execute_as_graph() + * map: Delegate the responsibility of execution to the node.execute_as_graph() + + Transpilers will NEVER use this method and will NEVER call ths method. + This method should only be used by interactive executors. + + Args: + node (Node): The node to execute + map_variable (dict, optional): If the node if of a map state, this corresponds to the value of iterable. + Defaults to None. + """ + step_log = self._context.run_log_store.create_step_log(node.name, node._get_step_log_name(map_variable)) + + self.add_code_identities(node=node, step_log=step_log) + + step_log.step_type = node.node_type + step_log.status = defaults.PROCESSING + + self._context.run_log_store.add_step_log(step_log, self._context.run_id) + + logger.info(f"Executing node: {node.get_summary()}") + + # Add the step log to the database as per the situation. + # If its a terminal node, complete it now + if node.node_type in ["success", "fail"]: + self._execute_node(node, map_variable=map_variable, **kwargs) + return + + # We call an internal function to iterate the sub graphs and execute them + if node.is_composite: + node.execute_as_graph(map_variable=map_variable, **kwargs) + return + + task_console.export_text(clear=True) + + task_name = node._resolve_map_placeholders(node.internal_name, map_variable) + console.print(f":runner: Executing the node {task_name} ... ", style="bold color(208)") + self.trigger_job(node=node, map_variable=map_variable, **kwargs) + def execute_job(self, node: TaskNode): """ Set up the step log and call the execute node diff --git a/runnable/tasks.py b/runnable/tasks.py index fe3d0755..6d877b2d 100644 --- a/runnable/tasks.py +++ b/runnable/tasks.py @@ -17,7 +17,7 @@ from stevedore import driver import runnable.context as context -from runnable import console, defaults, exceptions, parameters, utils +from runnable import console, defaults, exceptions, parameters, task_console, utils from runnable.datastore import ( JsonParameter, MetricParameter, @@ -144,8 +144,8 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: if context_param in params: params[param_name].value = params[context_param].value - console.log("Parameters available for the execution:") - console.log(params) + task_console.log("Parameters available for the execution:") + task_console.log(params) logger.debug(f"Resolved parameters: {params}") @@ -153,18 +153,12 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: params = {key: value for key, value in params.items() if isinstance(value, JsonParameter)} parameters_in = copy.deepcopy(params) - f = io.StringIO() try: - with contextlib.redirect_stdout(f): - # with contextlib.nullcontext(): - yield params + yield params except Exception as e: # pylint: disable=broad-except console.log(e, style=defaults.error_style) logger.exception(e) finally: - print(f.getvalue()) # print to console - f.close() - # Update parameters # This should only update the parameters that are changed at the root level. diff_parameters = self._diff_parameters(parameters_in=parameters_in, context_params=params) @@ -226,9 +220,11 @@ def execute_command( filtered_parameters = parameters.filter_arguments_for_func(f, params.copy(), map_variable) logger.info(f"Calling {func} from {module} with {filtered_parameters}") - user_set_parameters = f(**filtered_parameters) # This is a tuple or single value + out_file = io.StringIO() + with contextlib.redirect_stdout(out_file): + user_set_parameters = f(**filtered_parameters) # This is a tuple or single value + task_console.print(out_file.getvalue()) except Exception as e: - console.log(e, style=defaults.error_style, markup=False) raise exceptions.CommandCallError(f"Function call: {self.command} did not succeed.\n") from e attempt_log.input_parameters = params.copy() @@ -272,8 +268,8 @@ def execute_command( except Exception as _e: msg = f"Call to the function {self.command} did not succeed.\n" attempt_log.message = msg - console.print_exception(show_locals=False) - console.log(_e, style=defaults.error_style) + task_console.print_exception(show_locals=False) + task_console.log(_e, style=defaults.error_style) attempt_log.end_time = str(datetime.now()) @@ -359,7 +355,11 @@ def execute_command( } kwds.update(ploomber_optional_args) - pm.execute_notebook(**kwds) + out_file = io.StringIO() + with contextlib.redirect_stdout(out_file): + pm.execute_notebook(**kwds) + task_console.print(out_file.getvalue()) + context.run_context.catalog_handler.put(name=notebook_output_path, run_id=context.run_context.run_id) client = PloomberClient.from_path(path=notebook_output_path) @@ -380,8 +380,8 @@ def execute_command( ) except PicklingError as e: logger.exception("Notebooks cannot return objects") - console.log("Notebooks cannot return objects", style=defaults.error_style) - console.log(e, style=defaults.error_style) + # task_console.log("Notebooks cannot return objects", style=defaults.error_style) + # task_console.log(e, style=defaults.error_style) logger.exception(e) raise @@ -400,8 +400,7 @@ def execute_command( logger.exception(msg) logger.exception(e) - console.log(msg, style=defaults.error_style) - + # task_console.log(msg, style=defaults.error_style) attempt_log.status = defaults.FAIL attempt_log.end_time = str(datetime.now()) @@ -488,14 +487,14 @@ def execute_command( if proc.returncode != 0: msg = ",".join(result[1].split("\n")) - console.print(msg, style=defaults.error_style) + task_console.print(msg, style=defaults.error_style) raise exceptions.CommandCallError(msg) # for stderr for line in result[1].split("\n"): if line.strip() == "": continue - console.print(line, style=defaults.warning_style) + task_console.print(line, style=defaults.warning_style) output_parameters: Dict[str, Parameter] = {} metrics: Dict[str, Parameter] = {} @@ -506,7 +505,7 @@ def execute_command( continue logger.info(line) - console.print(line) + task_console.print(line) if line.strip() == collect_delimiter: # The lines from now on should be captured @@ -548,8 +547,8 @@ def execute_command( logger.exception(msg) logger.exception(e) - console.log(msg, style=defaults.error_style) - console.log(e, style=defaults.error_style) + task_console.log(msg, style=defaults.error_style) + task_console.log(e, style=defaults.error_style) attempt_log.status = defaults.FAIL