Skip to content

Commit

Permalink
[dagster-aws] attach tags and metadata in PipesGlueClient (#25842)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Add AWS Glue metadata injection to `PipesGlueClient`

Also, when CloudWatch logging is not enabled for a Glue job, the client
now logs a warning message instead of failing.

## Changelog
[dagster-aws] `PipesGlueClient` now attaches AWS Glue metadata to
Dagster results produced during Pipes invocation
  • Loading branch information
danielgafni authored Nov 15, 2024
1 parent 8cdc4dc commit b087b74
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import boto3
import dagster._check as check
from botocore.exceptions import ClientError
from dagster import PipesClient
from dagster import MetadataValue, PipesClient
from dagster._annotations import experimental, public
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
Expand All @@ -21,7 +22,7 @@

if TYPE_CHECKING:
from mypy_boto3_glue.client import GlueClient
from mypy_boto3_glue.type_defs import StartJobRunRequestRequestTypeDef
from mypy_boto3_glue.type_defs import GetJobRunResponseTypeDef, StartJobRunRequestRequestTypeDef


@experimental
Expand Down Expand Up @@ -93,6 +94,8 @@ def run(

params["Arguments"].update(pipes_args) # pyright: ignore (reportAttributeAccessIssue)

# Note: AWS Glue doesn't have a concept of Tags so we can't inject Dagster tags from session.default_remote_invocation_info

try:
run_id = self._client.start_job_run(**params)["JobRunId"]

Expand All @@ -106,7 +109,6 @@ def run(
raise

response = self._client.get_job_run(JobName=job_name, RunId=run_id)
log_group = response["JobRun"]["LogGroupName"] # pyright: ignore (reportTypedDictNotRequiredAccess)
context.log.info(f"Started AWS Glue job {job_name} run: {run_id}")

try:
Expand All @@ -116,7 +118,8 @@ def run(
self._terminate_job_run(context=context, job_name=job_name, run_id=run_id)
raise

if status := response["JobRun"]["JobRunState"] != "SUCCEEDED":
status = response["JobRun"]["JobRunState"] # pyright: ignore (reportOptionalSubscript)
if status != "SUCCEEDED":
raise RuntimeError(
f"Glue job {job_name} run {run_id} completed with status {status} :\n{response['JobRun'].get('ErrorMessage')}"
)
Expand All @@ -125,24 +128,34 @@ def run(

# Glue is dumping both stdout and stderr to the same log group called */output
if isinstance(self._message_reader, PipesCloudWatchMessageReader):
session.report_launched(
{
"extras": {"log_group": f"{log_group}/output", "log_stream": run_id},
}
)
if isinstance(self._message_reader, PipesCloudWatchMessageReader):
self._message_reader.add_log_reader(
PipesCloudWatchLogReader(
client=self._message_reader.client,
log_group=f"{log_group}/output",
log_stream=run_id,
start_time=int(session.created_at.timestamp() * 1000),
),
)

return PipesClientCompletedInvocation(session)

def _wait_for_job_run_completion(self, job_name: str, run_id: str) -> Dict[str, Any]:
log_group = response.get("JobRun", {}).get("LogGroupName")

if log_group is None:
context.log.warning(
f"CloudWatch logging is not enabled for Glue job {job_name}. Messages and logs will not be available."
)
else:
session.report_launched(
{
"extras": {"log_group": f"{log_group}/output", "log_stream": run_id},
}
)
self._message_reader.add_log_reader(
PipesCloudWatchLogReader(
client=self._message_reader.client,
log_group=f"{log_group}/output",
log_stream=run_id,
start_time=int(session.created_at.timestamp() * 1000),
),
)

return PipesClientCompletedInvocation(
session, metadata=self._extract_dagster_metadata(response)
)

def _wait_for_job_run_completion(
self, job_name: str, run_id: str
) -> "GetJobRunResponseTypeDef":
while True:
response = self._client.get_job_run(JobName=job_name, RunId=run_id)
# https://docs.aws.amazon.com/glue/latest/dg/job-run-statuses.html
Expand All @@ -153,9 +166,27 @@ def _wait_for_job_run_completion(self, job_name: str, run_id: str) -> Dict[str,
"TIMEOUT",
"ERROR",
]:
return response # pyright: ignore (reportReturnType)
return response
time.sleep(5)

def _extract_dagster_metadata(self, response: "GetJobRunResponseTypeDef") -> RawMetadataMapping:
job_run = response["JobRun"]

metadata: RawMetadataMapping = {}

if job_run_id := job_run.get("Id"):
metadata["AWS Glue Job Run ID"] = job_run_id

if job_name := job_run.get("JobName"):
metadata["AWS Glue Job Name"] = job_name

if job_run_id is not None and job_name is not None:
metadata["AWS Glue Job Run URL"] = MetadataValue.url(
f"https://{self._client.meta.region_name}.console.aws.amazon.com/gluestudio/home?region={self._client.meta.region_name}#/job/{job_name}/run/{job_run_id}"
)

return metadata

def _terminate_job_run(
self, context: Union[OpExecutionContext, AssetExecutionContext], job_name: str, run_id: str
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def __init__(

self._job_runs: Dict[str, SimulatedJobRun] = {} # mapping of JobRunId to SimulatedJobRun

@property
def meta(self):
return self.glue_client.meta

def get_job_run(self, JobName: str, RunId: str):
# get original response
response = self.glue_client.get_job_run(JobName=JobName, RunId=RunId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ def foo(context: AssetExecutionContext, pipes_glue_client: PipesGlueClient):
assert mat and mat.asset_materialization
assert isinstance(mat.asset_materialization.metadata["bar"], MarkdownMetadataValue)
assert mat.asset_materialization.metadata["bar"].value == "baz"
assert "AWS Glue Job Run ID" in mat.asset_materialization.metadata
assert mat.asset_materialization.tags
assert mat.asset_materialization.tags[DATA_VERSION_TAG] == "alpha"
assert mat.asset_materialization.tags[DATA_VERSION_IS_USER_PROVIDED_TAG]
Expand Down

0 comments on commit b087b74

Please sign in to comment.