Skip to content

Commit

Permalink
[dagster-aws] reusable function for injecting env vars into EMR confi…
Browse files Browse the repository at this point in the history
…gurations (#26969)

## Summary & Motivation

This function will be reused downstack in the Pipes client for AWS EMR Containers

## How I Tested These Changes

Simple refactor so no testing (pyright is enough)
  • Loading branch information
danielgafni authored Jan 14, 2025
1 parent 39923f7 commit 5e9b1e9
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dagster._core.pipes.utils import PipesEnvContextInjector, PipesSession, open_pipes_session

from dagster_aws.emr.emr import EMR_CLUSTER_TERMINATED_STATES
from dagster_aws.pipes.clients.utils import emr_inject_pipes_env_vars
from dagster_aws.pipes.message_readers import (
PipesS3LogReader,
PipesS3MessageReader,
Expand All @@ -37,38 +38,6 @@
)


def add_configuration(
configurations: list["ConfigurationUnionTypeDef"],
configuration: "ConfigurationUnionTypeDef",
):
"""Add a configuration to a list of EMR configurations, merging configurations with the same classification.
This is necessary because EMR doesn't accept multiple configurations with the same classification.
"""
for existing_configuration in configurations:
if existing_configuration.get("Classification") is not None and existing_configuration.get(
"Classification"
) == configuration.get("Classification"):
properties = {**existing_configuration.get("Properties", {})}
properties.update(properties)

inner_configurations = cast(
list["ConfigurationUnionTypeDef"], existing_configuration.get("Configurations", [])
)

for inner_configuration in cast(
list["ConfigurationUnionTypeDef"], configuration.get("Configurations", [])
):
add_configuration(inner_configurations, inner_configuration)

existing_configuration["Properties"] = properties
existing_configuration["Configurations"] = inner_configurations # type: ignore

break
else:
configurations.append(configuration)


@public
@experimental
class PipesEMRClient(PipesClient, TreatAsResourceParam):
Expand Down Expand Up @@ -162,39 +131,10 @@ def run(
def _enrich_params(
self, session: PipesSession, params: "RunJobFlowInputRequestTypeDef"
) -> "RunJobFlowInputRequestTypeDef":
# add Pipes env variables
pipes_env_vars = session.get_bootstrap_env_vars()

configurations = cast(list["ConfigurationUnionTypeDef"], params.get("Configurations", []))

# add all possible env vars to spark-defaults, spark-env, yarn-env, hadoop-env
# since we can't be sure which one will be used by the job
add_configuration(
configurations,
{
"Classification": "spark-defaults",
"Properties": {
f"spark.yarn.appMasterEnv.{var}": value for var, value in pipes_env_vars.items()
},
},
params["Configurations"] = emr_inject_pipes_env_vars(
session, cast(list["ConfigurationUnionTypeDef"], params.get("Configurations", []))
)

for classification in ["spark-env", "yarn-env", "hadoop-env"]:
add_configuration(
configurations,
{
"Classification": classification,
"Configurations": [
{
"Classification": "export",
"Properties": pipes_env_vars,
}
],
},
)

params["Configurations"] = configurations

tags = list(params.get("Tags", []))

for key, value in session.default_remote_invocation_info.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import TYPE_CHECKING, cast

from dagster._core.pipes.utils import PipesSession

if TYPE_CHECKING:
from mypy_boto3_emr.type_defs import ConfigurationUnionTypeDef


def add_emr_configuration(
configurations: list["ConfigurationUnionTypeDef"],
configuration: "ConfigurationUnionTypeDef",
):
"""Add a configuration to a list of EMR configurations, merging configurations with the same classification.
This is necessary because EMR doesn't accept multiple configurations with the same classification.
"""
for existing_configuration in configurations:
if existing_configuration.get("Classification") is not None and existing_configuration.get(
"Classification"
) == configuration.get("Classification"):
properties = {**existing_configuration.get("Properties", {})}
properties.update(properties)

inner_configurations = cast(
list["ConfigurationUnionTypeDef"], existing_configuration.get("Configurations", [])
)

for inner_configuration in cast(
list["ConfigurationUnionTypeDef"], configuration.get("Configurations", [])
):
add_emr_configuration(inner_configurations, inner_configuration)

existing_configuration["Properties"] = properties
existing_configuration["Configurations"] = inner_configurations # type: ignore

break
else:
configurations.append(configuration)


def emr_inject_pipes_env_vars(
session: PipesSession, configurations: list["ConfigurationUnionTypeDef"]
) -> list["ConfigurationUnionTypeDef"]:
pipes_env_vars = session.get_bootstrap_env_vars()

# add all possible env vars to spark-defaults, spark-env, yarn-env, hadoop-env
# since we can't be sure which one will be used by the job
add_emr_configuration(
configurations,
{
"Classification": "spark-defaults",
"Properties": {
f"spark.yarn.appMasterEnv.{var}": value for var, value in pipes_env_vars.items()
},
},
)

for classification in ["spark-env", "yarn-env", "hadoop-env"]:
add_emr_configuration(
configurations,
{
"Classification": classification,
"Configurations": [
{
"Classification": "export",
"Properties": pipes_env_vars,
}
],
},
)

return configurations

0 comments on commit 5e9b1e9

Please sign in to comment.