diff --git a/integration_tests/test_suites/k8s-test-suite/tests/test_external_asset.py b/integration_tests/test_suites/k8s-test-suite/tests/test_external_asset.py index a109970bf3091..8ae3d8fd81b0c 100644 --- a/integration_tests/test_suites/k8s-test-suite/tests/test_external_asset.py +++ b/integration_tests/test_suites/k8s-test-suite/tests/test_external_asset.py @@ -11,7 +11,7 @@ from dagster_k8s import execute_k8s_job from dagster_k8s.client import DagsterKubernetesClient from dagster_k8s.pipes import PipesK8sClient, PipesK8sPodLogsMessageReader -from dagster_pipes import DefaultPipesContextLoader, PipesContextData +from dagster_pipes import PipesContextData, PipesDefaultContextLoader from dagster_test.test_project import ( get_test_project_docker_image, ) @@ -80,7 +80,7 @@ def inject_context( ) self._client.core_api.create_namespaced_config_map(self._namespace, context_config_map_body) try: - yield {DefaultPipesContextLoader.FILE_PATH_KEY: "/mnt/dagster/context.json"} + yield {PipesDefaultContextLoader.FILE_PATH_KEY: "/mnt/dagster/context.json"} finally: self._client.core_api.delete_namespaced_config_map(self._cm_name, self._namespace) diff --git a/python_modules/dagster-pipes/dagster_pipes/__init__.py b/python_modules/dagster-pipes/dagster_pipes/__init__.py index 38b3c025a4f94..b1de05881b10b 100644 --- a/python_modules/dagster-pipes/dagster_pipes/__init__.py +++ b/python_modules/dagster-pipes/dagster_pipes/__init__.py @@ -491,7 +491,7 @@ def upload_messages_chunk(self, payload: IO, index: int) -> None: # ######################## -class DefaultPipesContextLoader(PipesContextLoader): +class PipesDefaultContextLoader(PipesContextLoader): FILE_PATH_KEY = "path" DIRECT_KEY = "data" @@ -558,7 +558,7 @@ def write_message(self, message: PipesMessage) -> None: self._stream.writelines((json.dumps(message), "\n")) -class EnvVarPipesParamsLoader(PipesParamsLoader): +class PipesEnvVarParamsLoader(PipesParamsLoader): def load_context_params(self) -> PipesParams: return _param_from_env_var("context") @@ -617,7 +617,7 @@ def upload_messages_chunk(self, payload: IO, index: int) -> None: # ######################## -class DbfsPipesContextLoader(PipesContextLoader): +class PipesDbfsContextLoader(PipesContextLoader): @contextmanager def load_context(self, params: PipesParams) -> Iterator[PipesContextData]: unmounted_path = _assert_env_param_type(params, "path", str, self.__class__) @@ -653,10 +653,10 @@ def init_dagster_pipes( return PipesContext.get() if is_dagster_pipes_process(): - params_loader = params_loader or EnvVarPipesParamsLoader() + params_loader = params_loader or PipesEnvVarParamsLoader() context_params = params_loader.load_context_params() messages_params = params_loader.load_messages_params() - context_loader = context_loader or DefaultPipesContextLoader() + context_loader = context_loader or PipesDefaultContextLoader() message_writer = message_writer or PipesDefaultMessageWriter() stack = ExitStack() context_data = stack.enter_context(context_loader.load_context(context_params)) diff --git a/python_modules/dagster/dagster/_core/pipes/utils.py b/python_modules/dagster/dagster/_core/pipes/utils.py index 44aa76edb1a3c..92f88d65fcf53 100644 --- a/python_modules/dagster/dagster/_core/pipes/utils.py +++ b/python_modules/dagster/dagster/_core/pipes/utils.py @@ -11,8 +11,8 @@ from dagster_pipes import ( PIPES_PROTOCOL_VERSION_FIELD, - DefaultPipesContextLoader, PipesContextData, + PipesDefaultContextLoader, PipesDefaultMessageWriter, PipesExtras, PipesParams, @@ -66,7 +66,7 @@ def inject_context(self, context_data: "PipesContextData") -> Iterator[PipesPara with open(self._path, "w") as input_stream: json.dump(context_data, input_stream) try: - yield {DefaultPipesContextLoader.FILE_PATH_KEY: self._path} + yield {PipesDefaultContextLoader.FILE_PATH_KEY: self._path} finally: if os.path.exists(self._path): os.remove(self._path) @@ -115,7 +115,7 @@ def inject_context( PipesParams: A dict of parameters that can be used by the external process to locate and load the injected context data. """ - yield {DefaultPipesContextLoader.DIRECT_KEY: context_data} + yield {PipesDefaultContextLoader.DIRECT_KEY: context_data} @experimental diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py index 59e56f405ea57..b80352d3779d9 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py @@ -17,13 +17,13 @@ def script_fn(): from dagster_pipes import ( - DbfsPipesContextLoader, + PipesDbfsContextLoader, PipesDbfsMessageWriter, init_dagster_pipes, ) context = init_dagster_pipes( - context_loader=DbfsPipesContextLoader(), message_writer=PipesDbfsMessageWriter() + context_loader=PipesDbfsContextLoader(), message_writer=PipesDbfsMessageWriter() ) multiplier = context.get_extra("multiplier")