From 51c3f290260255cc18a35b4e07321f348afedb00 Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 14 Aug 2024 11:27:46 -0500 Subject: [PATCH] First order deep merge of flow run overrides with deployment overrides (#14923) --- src/prefect/workers/base.py | 7 ++++++- tests/workers/test_base_worker.py | 34 ++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index 7ece104c426b..f92bae1c4a0e 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -981,7 +981,12 @@ async def _get_configuration( deployment_vars = deployment.job_variables or {} flow_run_vars = flow_run.job_variables or {} - job_variables = {**deployment_vars, **flow_run_vars} + job_variables = {**deployment_vars} + + # merge environment variables carefully, otherwise full override + if isinstance(job_variables.get("env"), dict): + job_variables["env"].update(flow_run_vars.pop("env", {})) + job_variables.update(flow_run_vars) configuration = await self.job_configuration.from_template_and_values( base_job_template=self._work_pool.base_job_template, diff --git a/tests/workers/test_base_worker.py b/tests/workers/test_base_worker.py index 02c8567d46b5..a68c0a55db05 100644 --- a/tests/workers/test_base_worker.py +++ b/tests/workers/test_base_worker.py @@ -30,7 +30,7 @@ ) from prefect.flows import flow from prefect.server import models -from prefect.server.schemas.core import Flow +from prefect.server.schemas.core import Deployment, Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.server.schemas.states import StateType from prefect.settings import ( @@ -2073,3 +2073,35 @@ async def test_worker_last_polled_health_check( # cleanup mock pendulum.set_test_now() + + +async def test_env_merge_logic_is_deep(prefect_client, session, flow): + deployment = await models.deployments.create_deployment( + session=session, + deployment=Deployment( + name="env-testing", + tags=["test"], + flow_id=flow.id, + schedule=None, + path="./subdir", + entrypoint="/file.py:flow", + parameter_openapi_schema=None, + job_variables={"env": {"test-var": "foo"}}, + ), + ) + await session.commit() + + flow_run = await prefect_client.create_flow_run_from_deployment( + deployment.id, + state=Pending(), + job_variables={"env": {"another-var": "boo"}}, + ) + async with WorkerTestImpl( + name="test", + work_pool_name="test-work-pool", + ) as worker: + await worker.sync_with_backend() + config = await worker._get_configuration(flow_run) + + assert config.env["test-var"] == "foo" + assert config.env["another-var"] == "boo"