Skip to content

Commit

Permalink
Add PipesCloudWatchMessageReader
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Aug 7, 2024
1 parent 06d5afe commit 2f62c83
Showing 1 changed file with 65 additions and 4 deletions.
69 changes: 65 additions & 4 deletions python_modules/libraries/dagster-aws/dagster_aws/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dagster import PipesClient
from dagster._annotations import experimental
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.pipes.client import (
PipesClientCompletedInvocation,
Expand Down Expand Up @@ -181,6 +182,33 @@ def consume_cloudwatch_logs(
def no_messages_debug_text(self) -> str:
return "Attempted to read messages by extracting them from the tail of CloudWatch logs directly."

def _get_all_cloudwatch_events(
self,
log_group: str,
log_stream: str,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> Generator[List[CloudWatchEvent], None, None]:
"""Returns batches of CloudWatch events until the stream is complete or end_time."""
params = {
"logGroupName": log_group,
"logStreamName": log_stream,
}

if start_time is not None:
params["startTime"] = start_time
if end_time is not None:
params["endTime"] = end_time

response = self.client.get_log_events(**params)

while events := response.get("events"):
yield events

params["nextToken"] = response["nextForwardToken"]

response = self.client.get_log_events(**params)


class PipesLambdaEventContextInjector(PipesEnvContextInjector):
def no_messages_debug_text(self) -> str:
Expand Down Expand Up @@ -288,7 +316,7 @@ class PipesGlueClient(PipesClient, TreatAsResourceParam):
client (boto3.client): The boto Glue client used to call invoke.
context_injector (Optional[PipesContextInjector]): A context injector to use to inject
context into the Glue job, for example, :py:class:`PipesGlueContextInjector`.
message_reader (Optional[PipesMessageReader]): A message reader to use to read messages
message_reader (Optional[PipesGlueMessageReader]): A message reader to use to read messages
from the glue job run. Defaults to :py:class:`PipesGlueLogsMessageReader`.
"""

Expand Down Expand Up @@ -399,9 +427,28 @@ def run(
)
raise

# TODO: get logs from CloudWatch. there are 2 separate streams for stdout and driver stderr to read from
# the log group can be found in the response from start_job_run, and the log stream is the job run id
# worker logs have log streams like: <job_id>_<worker_id> but we probably don't need to read those
response = self._client.get_job_run(JobName=job_name, RunId=run_id)
log_group = response["JobRun"]["LogGroupName"]
context.log.info(f"Started AWS Glue job {job_name} run: {run_id}")

try:
response = self._wait_for_job_run_completion(job_name, run_id)
except DagsterExecutionInterruptedError:
self._terminate_job_run(context=context, job_name=job_name, run_id=run_id)
raise

if response["JobRun"]["JobRunState"] == "FAILED":
raise RuntimeError(
f"Glue job {job_name} run {run_id} failed:\n{response['JobRun']['ErrorMessage']}"
)
else:
context.log.info(f"Glue job {job_name} run {run_id} completed successfully")

if isinstance(self._message_reader, PipesCloudWatchMessageReader):
# TODO: receive messages from a background thread in real-time
self._message_reader.consume_cloudwatch_logs(
f"{log_group}/output", run_id, start_time=int(start_timestamp)
)

# should probably have a way to return the lambda result payload
return PipesClientCompletedInvocation(session)
Expand All @@ -412,3 +459,17 @@ def _wait_for_job_run_completion(self, job_name: str, run_id: str) -> Dict[str,
if response["JobRun"]["JobRunState"] in ["FAILED", "SUCCEEDED"]:
return response
time.sleep(5)

def _terminate_job_run(self, context: OpExecutionContext, job_name: str, run_id: str):
"""Creates a handler which will gracefully stop the Run in case of external termination.
It will stop the Glue job before doing so.
"""
context.log.warning(f"Dagster run interrupted! Stopping Glue job run {run_id}...")
response = self._client.batch_stop_job_run(JobName=job_name, JobRunIds=[run_id])
runs = response["SuccessfulSubmissions"]
if len(runs) > 0:
context.log.warning(f"Successfully stopped Glue job run {run_id}.")
else:
context.log.warning(
f"Something went wrong during run termination: {response['errors']}"
)

0 comments on commit 2f62c83

Please sign in to comment.