Skip to content

Commit

Permalink
[pipes] change env vars scheme for subprocess client
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Oct 30, 2023
1 parent 3f31cf3 commit 07a56d7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 14 deletions.
31 changes: 17 additions & 14 deletions python_modules/dagster/dagster/_core/pipes/subprocess.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from subprocess import Popen
from typing import Mapping, Optional, Sequence, Union

Expand All @@ -22,15 +23,16 @@


@experimental
class _PipesSubprocess(PipesClient):
class _PipesSubprocessClient(PipesClient):
"""A pipes client that runs a subprocess with the given command and environment.
By default parameters are injected via environment variables. Context is passed via
a temp file, and structured messages are read from from a temp file.
Args:
env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the
subprocess.
env (Optional[Mapping[str, str]]): The base environment variables to pass to the
subprocess, over layed with those set in the run call. Defaults to None,
inheriting the environment variables of the current process.
cwd (Optional[str]): Working directory in which to launch the subprocess command.
context_injector (Optional[PipesContextInjector]): A context injector to use to inject
context into the subprocess. Defaults to :py:class:`PipesTempFileContextInjector`.
Expand All @@ -45,17 +47,17 @@ def __init__(
context_injector: Optional[PipesContextInjector] = None,
message_reader: Optional[PipesMessageReader] = None,
):
self.env = check.opt_mapping_param(env, "env", key_type=str, value_type=str)
self.cwd = check.opt_str_param(cwd, "cwd")
self.context_injector = (
self._base_env = check.opt_nullable_mapping_param(env, "env", key_type=str, value_type=str)
self._cwd = check.opt_str_param(cwd, "cwd")
self._context_injector = (
check.opt_inst_param(
context_injector,
"context_injector",
PipesContextInjector,
)
or PipesTempFileContextInjector()
)
self.message_reader = (
self._message_reader = (
check.opt_inst_param(
message_reader,
"message_reader",
Expand Down Expand Up @@ -84,7 +86,8 @@ def run(
command (Union[str, Sequence[str]]): The command to run. Will be passed to `subprocess.Popen()`.
context (OpExecutionContext): The context from the executing op or asset.
extras (Optional[PipesExtras]): An optional dict of extra parameters to pass to the subprocess.
env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess.
env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the
subprocess, atop those set at the client level.
cwd (Optional[str]): Working directory in which to launch the subprocess command.
Returns:
Expand All @@ -93,17 +96,17 @@ def run(
"""
with open_pipes_session(
context=context,
context_injector=self.context_injector,
message_reader=self.message_reader,
context_injector=self._context_injector,
message_reader=self._message_reader,
extras=extras,
) as pipes_session:
process = Popen(
command,
cwd=cwd or self.cwd,
cwd=cwd or self._cwd,
env={
**pipes_session.get_bootstrap_env_vars(),
**self.env,
**(self._base_env if self._base_env is not None else os.environ),
**(env or {}),
**pipes_session.get_bootstrap_env_vars(),
},
)
process.wait()
Expand All @@ -114,4 +117,4 @@ def run(
return PipesClientCompletedInvocation(tuple(pipes_session.get_results()))


PipesSubprocessClient = ResourceParam[_PipesSubprocess]
PipesSubprocessClient = ResourceParam[_PipesSubprocessClient]
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
open_pipes_session,
)
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus
from dagster._utils.env import environ
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader
from moto.server import ThreadedMotoServer

Expand Down Expand Up @@ -536,3 +537,55 @@ def foo(context: OpExecutionContext, pipes_client: PipesSubprocessClient):
assert len(pipes_msgs) == 2
assert "successfully opened" in pipes_msgs[0]
assert "did not receive closed message" in pipes_msgs[1]


def test_subprocess_env_precedence():
def script_fn():
import os

from dagster_pipes import open_dagster_pipes

with open_dagster_pipes() as context:
context.report_asset_materialization(
metadata={
"A": os.getenv("A"),
"B": os.getenv("B"),
"C": os.getenv("C"),
},
)

@asset
def env_test(context, pipes_client: PipesSubprocessClient):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
return pipes_client.run(
env={"C": "callsite"},
command=cmd,
context=context,
).get_results()

# test that client vals override outer env
with environ({"A": "parent", "B": "parent"}):
result = materialize(
[env_test],
resources={"pipes_client": PipesSubprocessClient(env={"A": "client"})},
)
assert result.success
mat_evts = result.get_asset_materialization_events()
assert len(mat_evts) == 1
assert mat_evts[0].materialization.metadata["A"].value == "client"
# setting base env on client prevents parent env inheritance
assert mat_evts[0].materialization.metadata["B"].value == None
assert mat_evts[0].materialization.metadata["C"].value == "callsite"

# test that callsite vals override client env (and outer env not inherited)
result = materialize(
[env_test],
resources={"pipes_client": PipesSubprocessClient()},
)
assert result.success
mat_evts = result.get_asset_materialization_events()
assert len(mat_evts) == 1
assert mat_evts[0].materialization.metadata["A"].value == "parent"
assert mat_evts[0].materialization.metadata["B"].value == "parent"
assert mat_evts[0].materialization.metadata["C"].value == "callsite"

0 comments on commit 07a56d7

Please sign in to comment.