diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes.py index b1f3e622148ea..620f7d45378bf 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes.py @@ -484,3 +484,25 @@ def _wait_for_job_run_completion(self, job_name: str, run_id: str) -> Dict[str, if response["JobRun"]["JobRunState"] in ["FAILED", "SUCCEEDED"]: return response time.sleep(5) + + def _register_interruption_handler(self, context: OpExecutionContext, job_name: str, run_id: str): + """ + Creates a handler which will gracefully stop the Run in case of external termination. + It will stop the Glue job before doing so. + """ + + def handler(signum, frame): + context.log.warning(f"Dagster run interrupted! Stopping Glue job run {run_id}...") + response = self._client.batch_stop_job_run( + JobName=job_name, + JobRunIds=[run_id] + ) + runs = response["SuccessfulSubmissions"] + if len(runs) > 0: + context.log.warning(f"Successfully stopped Glue job run {run_id}.") + else: + context.log.warning(f"Something went wrong during run termination: {response['errors']}") + + raise KeyboardInterrupt + + signal.signal(signal.SIGINT, handler)