diff --git a/python_modules/dagster/dagster/_core/pipes/subprocess.py b/python_modules/dagster/dagster/_core/pipes/subprocess.py index bd56eede481f7..cb897686d0dc7 100644 --- a/python_modules/dagster/dagster/_core/pipes/subprocess.py +++ b/python_modules/dagster/dagster/_core/pipes/subprocess.py @@ -1,3 +1,4 @@ +import os from subprocess import Popen from typing import Mapping, Optional, Sequence, Union @@ -22,15 +23,16 @@ @experimental -class _PipesSubprocess(PipesClient): +class _PipesSubprocessClient(PipesClient): """A pipes client that runs a subprocess with the given command and environment. By default parameters are injected via environment variables. Context is passed via a temp file, and structured messages are read from from a temp file. Args: - env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the - subprocess. + env (Optional[Mapping[str, str]]): The base environment variables to pass to the + subprocess, over layed with those set in the run call. Defaults to None, + inheriting the environment variables of the current process. cwd (Optional[str]): Working directory in which to launch the subprocess command. context_injector (Optional[PipesContextInjector]): A context injector to use to inject context into the subprocess. Defaults to :py:class:`PipesTempFileContextInjector`. @@ -45,9 +47,9 @@ def __init__( context_injector: Optional[PipesContextInjector] = None, message_reader: Optional[PipesMessageReader] = None, ): - self.env = check.opt_mapping_param(env, "env", key_type=str, value_type=str) - self.cwd = check.opt_str_param(cwd, "cwd") - self.context_injector = ( + self._base_env = check.opt_nullable_mapping_param(env, "env", key_type=str, value_type=str) + self._cwd = check.opt_str_param(cwd, "cwd") + self._context_injector = ( check.opt_inst_param( context_injector, "context_injector", @@ -55,7 +57,7 @@ def __init__( ) or PipesTempFileContextInjector() ) - self.message_reader = ( + self._message_reader = ( check.opt_inst_param( message_reader, "message_reader", @@ -84,7 +86,8 @@ def run( command (Union[str, Sequence[str]]): The command to run. Will be passed to `subprocess.Popen()`. context (OpExecutionContext): The context from the executing op or asset. extras (Optional[PipesExtras]): An optional dict of extra parameters to pass to the subprocess. - env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess. + env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the + subprocess, atop those set at the client level. cwd (Optional[str]): Working directory in which to launch the subprocess command. Returns: @@ -93,17 +96,17 @@ def run( """ with open_pipes_session( context=context, - context_injector=self.context_injector, - message_reader=self.message_reader, + context_injector=self._context_injector, + message_reader=self._message_reader, extras=extras, ) as pipes_session: process = Popen( command, - cwd=cwd or self.cwd, + cwd=cwd or self._cwd, env={ - **pipes_session.get_bootstrap_env_vars(), - **self.env, + **(self._base_env if self._base_env is not None else os.environ), **(env or {}), + **pipes_session.get_bootstrap_env_vars(), }, ) process.wait() @@ -114,4 +117,4 @@ def run( return PipesClientCompletedInvocation(tuple(pipes_session.get_results())) -PipesSubprocessClient = ResourceParam[_PipesSubprocess] +PipesSubprocessClient = ResourceParam[_PipesSubprocessClient] diff --git a/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py b/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py index 183b6ba5e087d..f1456fbab0319 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py +++ b/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py @@ -47,6 +47,7 @@ open_pipes_session, ) from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus +from dagster._utils.env import environ from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader from moto.server import ThreadedMotoServer @@ -536,3 +537,55 @@ def foo(context: OpExecutionContext, pipes_client: PipesSubprocessClient): assert len(pipes_msgs) == 2 assert "successfully opened" in pipes_msgs[0] assert "did not receive closed message" in pipes_msgs[1] + + +def test_subprocess_env_precedence(): + def script_fn(): + import os + + from dagster_pipes import open_dagster_pipes + + with open_dagster_pipes() as context: + context.report_asset_materialization( + metadata={ + "A": os.getenv("A"), + "B": os.getenv("B"), + "C": os.getenv("C"), + }, + ) + + @asset + def env_test(context, pipes_client: PipesSubprocessClient): + with temp_script(script_fn) as script_path: + cmd = [_PYTHON_EXECUTABLE, script_path] + return pipes_client.run( + env={"C": "callsite"}, + command=cmd, + context=context, + ).get_results() + + # test that client vals override outer env + with environ({"A": "parent", "B": "parent"}): + result = materialize( + [env_test], + resources={"pipes_client": PipesSubprocessClient(env={"A": "client"})}, + ) + assert result.success + mat_evts = result.get_asset_materialization_events() + assert len(mat_evts) == 1 + assert mat_evts[0].materialization.metadata["A"].value == "client" + # setting base env on client prevents parent env inheritance + assert mat_evts[0].materialization.metadata["B"].value == None + assert mat_evts[0].materialization.metadata["C"].value == "callsite" + + # test that callsite vals override client env (and outer env not inherited) + result = materialize( + [env_test], + resources={"pipes_client": PipesSubprocessClient()}, + ) + assert result.success + mat_evts = result.get_asset_materialization_events() + assert len(mat_evts) == 1 + assert mat_evts[0].materialization.metadata["A"].value == "parent" + assert mat_evts[0].materialization.metadata["B"].value == "parent" + assert mat_evts[0].materialization.metadata["C"].value == "callsite"