Skip to content

Commit

Permalink
add Dagster run interruption handler to AWS Glue Pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Jul 31, 2024
1 parent 18aafe3 commit 6679732
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,25 @@ def _wait_for_job_run_completion(self, job_name: str, run_id: str) -> Dict[str,
if response["JobRun"]["JobRunState"] in ["FAILED", "SUCCEEDED"]:
return response
time.sleep(5)

def _register_interruption_handler(self, context: OpExecutionContext, job_name: str, run_id: str):
"""
Creates a handler which will gracefully stop the Run in case of external termination.
It will stop the Glue job before doing so.
"""

def handler(signum, frame):
context.log.warning(f"Dagster run interrupted! Stopping Glue job run {run_id}...")
response = self._client.batch_stop_job_run(
JobName=job_name,
JobRunIds=[run_id]
)
runs = response["SuccessfulSubmissions"]
if len(runs) > 0:
context.log.warning(f"Successfully stopped Glue job run {run_id}.")
else:
context.log.warning(f"Something went wrong during run termination: {response['errors']}")

raise KeyboardInterrupt

signal.signal(signal.SIGINT, handler)

0 comments on commit 6679732

Please sign in to comment.