Skip to content

Commit

Permalink
[pipes] minor renames in dagster_pipes (#16917)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Renames in `dagster_pipes` for consistency:

- `DefaultPipesContextLoader` -> `PipesDefaultContextLoader`
- `EnvVarPipesParamsLoader` -> `PipesEnvVarParamsLoader`
- `DbfsPipesContextLoader` -> `PipesDbfsContextLoader`

## How I Tested These Changes

Existing test suite.
  • Loading branch information
smackesey authored Sep 29, 2023
1 parent 98436e0 commit 0870e20
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dagster_k8s import execute_k8s_job
from dagster_k8s.client import DagsterKubernetesClient
from dagster_k8s.pipes import PipesK8sClient, PipesK8sPodLogsMessageReader
from dagster_pipes import DefaultPipesContextLoader, PipesContextData
from dagster_pipes import PipesContextData, PipesDefaultContextLoader
from dagster_test.test_project import (
get_test_project_docker_image,
)
Expand Down Expand Up @@ -80,7 +80,7 @@ def inject_context(
)
self._client.core_api.create_namespaced_config_map(self._namespace, context_config_map_body)
try:
yield {DefaultPipesContextLoader.FILE_PATH_KEY: "/mnt/dagster/context.json"}
yield {PipesDefaultContextLoader.FILE_PATH_KEY: "/mnt/dagster/context.json"}
finally:
self._client.core_api.delete_namespaced_config_map(self._cm_name, self._namespace)

Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster-pipes/dagster_pipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def upload_messages_chunk(self, payload: IO, index: int) -> None:
# ########################


class DefaultPipesContextLoader(PipesContextLoader):
class PipesDefaultContextLoader(PipesContextLoader):
FILE_PATH_KEY = "path"
DIRECT_KEY = "data"

Expand Down Expand Up @@ -558,7 +558,7 @@ def write_message(self, message: PipesMessage) -> None:
self._stream.writelines((json.dumps(message), "\n"))


class EnvVarPipesParamsLoader(PipesParamsLoader):
class PipesEnvVarParamsLoader(PipesParamsLoader):
def load_context_params(self) -> PipesParams:
return _param_from_env_var("context")

Expand Down Expand Up @@ -617,7 +617,7 @@ def upload_messages_chunk(self, payload: IO, index: int) -> None:
# ########################


class DbfsPipesContextLoader(PipesContextLoader):
class PipesDbfsContextLoader(PipesContextLoader):
@contextmanager
def load_context(self, params: PipesParams) -> Iterator[PipesContextData]:
unmounted_path = _assert_env_param_type(params, "path", str, self.__class__)
Expand Down Expand Up @@ -653,10 +653,10 @@ def init_dagster_pipes(
return PipesContext.get()

if is_dagster_pipes_process():
params_loader = params_loader or EnvVarPipesParamsLoader()
params_loader = params_loader or PipesEnvVarParamsLoader()
context_params = params_loader.load_context_params()
messages_params = params_loader.load_messages_params()
context_loader = context_loader or DefaultPipesContextLoader()
context_loader = context_loader or PipesDefaultContextLoader()
message_writer = message_writer or PipesDefaultMessageWriter()
stack = ExitStack()
context_data = stack.enter_context(context_loader.load_context(context_params))
Expand Down
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

from dagster_pipes import (
PIPES_PROTOCOL_VERSION_FIELD,
DefaultPipesContextLoader,
PipesContextData,
PipesDefaultContextLoader,
PipesDefaultMessageWriter,
PipesExtras,
PipesParams,
Expand Down Expand Up @@ -66,7 +66,7 @@ def inject_context(self, context_data: "PipesContextData") -> Iterator[PipesPara
with open(self._path, "w") as input_stream:
json.dump(context_data, input_stream)
try:
yield {DefaultPipesContextLoader.FILE_PATH_KEY: self._path}
yield {PipesDefaultContextLoader.FILE_PATH_KEY: self._path}
finally:
if os.path.exists(self._path):
os.remove(self._path)
Expand Down Expand Up @@ -115,7 +115,7 @@ def inject_context(
PipesParams: A dict of parameters that can be used by the external process to locate and
load the injected context data.
"""
yield {DefaultPipesContextLoader.DIRECT_KEY: context_data}
yield {PipesDefaultContextLoader.DIRECT_KEY: context_data}


@experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

def script_fn():
from dagster_pipes import (
DbfsPipesContextLoader,
PipesDbfsContextLoader,
PipesDbfsMessageWriter,
init_dagster_pipes,
)

context = init_dagster_pipes(
context_loader=DbfsPipesContextLoader(), message_writer=PipesDbfsMessageWriter()
context_loader=PipesDbfsContextLoader(), message_writer=PipesDbfsMessageWriter()
)

multiplier = context.get_extra("multiplier")
Expand Down

0 comments on commit 0870e20

Please sign in to comment.