From 78a9fde09a36aed927a97c1005df692d0d24da35 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 9 Dec 2024 17:13:45 +0800 Subject: [PATCH 1/5] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak --- .../kyuubi/engine/flink/operation/PlanOnlyStatement.scala | 5 ++++- .../apache/kyuubi/engine/spark/operation/ExecutePython.scala | 1 + .../apache/kyuubi/engine/spark/operation/ExecuteScala.scala | 1 + .../kyuubi/engine/spark/operation/ExecuteStatement.scala | 1 + .../kyuubi/engine/trino/operation/ExecuteStatement.scala | 1 + .../scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 5 ++++- 6 files changed, 12 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala index 1284bfd73e6..b3b076f1288 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala @@ -67,7 +67,10 @@ class PlanOnlyStatement( statement) resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout); - case _ => explainOperation(statement) + shutdownTimeoutMonitor + case _ => + explainOperation(statement) + shutdownTimeoutMonitor } } catch { onError() diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index a3e090d2326..593a78e42b4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -127,6 +127,7 @@ class ExecutePython( val ke = KyuubiSQLException("Error submitting python in background", rejected) setOperationException(ke) + shutdownTimeoutMonitor throw ke } } else { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index e8335e549ea..a5fd68804f0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -154,6 +154,7 @@ class ExecuteScala( val ke = KyuubiSQLException("Error submitting scala in background", rejected) setOperationException(ke) + shutdownTimeoutMonitor throw ke } } else { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 53f02b2fbf6..40657828c98 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -124,6 +124,7 @@ class ExecuteStatement( val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) + shutdownTimeoutMonitor throw ke } } else { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 4f1b42e1d1b..3ecc72b772a 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -76,6 +76,7 @@ class ExecuteStatement( val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) + shutdownTimeoutMonitor throw ke } } else { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 02c22e4f6a7..c9f4a74cff4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -172,7 +172,10 @@ class ExecuteStatement( try { val opHandle = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(opHandle) - } catch onError("submitting query in background, query rejected") + } catch { + shutdownTimeoutMonitor + onError("submitting query in background, query rejected") + } if (!shouldRunAsync) getBackgroundHandle.get() } From 9e1a015f6984c5617231caa1516b3a8fd77c7f3d Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 9 Dec 2024 17:17:18 +0800 Subject: [PATCH 2/5] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak --- .../kyuubi/engine/flink/operation/PlanOnlyStatement.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala index b3b076f1288..0f90ce01028 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala @@ -67,12 +67,11 @@ class PlanOnlyStatement( statement) resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout); - shutdownTimeoutMonitor case _ => explainOperation(statement) - shutdownTimeoutMonitor } } catch { + shutdownTimeoutMonitor onError() } } From ef1f66bb55677ac7aab3e6bdf0c9886e6ec9eb37 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 9 Dec 2024 18:05:27 +0800 Subject: [PATCH 3/5] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak --- .../kyuubi/engine/flink/operation/PlanOnlyStatement.scala | 2 +- .../apache/kyuubi/engine/spark/operation/ExecutePython.scala | 2 +- .../org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala | 2 +- .../apache/kyuubi/engine/spark/operation/ExecuteStatement.scala | 2 +- .../apache/kyuubi/engine/trino/operation/ExecuteStatement.scala | 2 +- .../scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala index 0f90ce01028..66e54428d76 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala @@ -71,7 +71,7 @@ class PlanOnlyStatement( explainOperation(statement) } } catch { - shutdownTimeoutMonitor + shutdownTimeoutMonitor() onError() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index 593a78e42b4..6cf525924b0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -127,7 +127,7 @@ class ExecutePython( val ke = KyuubiSQLException("Error submitting python in background", rejected) setOperationException(ke) - shutdownTimeoutMonitor + shutdownTimeoutMonitor() throw ke } } else { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index a5fd68804f0..7db33f7668e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -154,7 +154,7 @@ class ExecuteScala( val ke = KyuubiSQLException("Error submitting scala in background", rejected) setOperationException(ke) - shutdownTimeoutMonitor + shutdownTimeoutMonitor() throw ke } } else { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 40657828c98..7cb2dee3656 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -124,7 +124,7 @@ class ExecuteStatement( val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) - shutdownTimeoutMonitor + shutdownTimeoutMonitor() throw ke } } else { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 3ecc72b772a..ad42d9b05a8 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -76,7 +76,7 @@ class ExecuteStatement( val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) - shutdownTimeoutMonitor + shutdownTimeoutMonitor() throw ke } } else { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index c9f4a74cff4..27f0b1818be 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -173,7 +173,7 @@ class ExecuteStatement( val opHandle = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(opHandle) } catch { - shutdownTimeoutMonitor + shutdownTimeoutMonitor() onError("submitting query in background, query rejected") } From 4b3417f21f58fc3f29bc67f8e5c8fc2420f4d49f Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 16 Dec 2024 14:46:31 +0800 Subject: [PATCH 4/5] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak --- .../apache/kyuubi/engine/flink/operation/FlinkOperation.scala | 1 + .../apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala | 1 - .../scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 1 - .../main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala | 1 + 4 files changed, 2 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index df067a888c6..95457ecaa7e 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -163,6 +163,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e) setOperationException(ke) setState(OperationState.ERROR) + shutdownTimeoutMonitor() throw ke } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala index 66e54428d76..276aafb2314 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala @@ -71,7 +71,6 @@ class PlanOnlyStatement( explainOperation(statement) } } catch { - shutdownTimeoutMonitor() onError() } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 27f0b1818be..46ca2e8f6c0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -173,7 +173,6 @@ class ExecuteStatement( val opHandle = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(opHandle) } catch { - shutdownTimeoutMonitor() onError("submitting query in background, query rejected") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index a543bddb6c0..9de1dfb2b96 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -95,6 +95,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi } setOperationException(ke) setState(OperationState.ERROR) + shutdownTimeoutMonitor() throw ke } } From 9107a300eaccdbf69a61bc2ac2328437eb63edd8 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 16 Dec 2024 15:08:13 +0800 Subject: [PATCH 5/5] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak --- .../kyuubi/engine/flink/operation/PlanOnlyStatement.scala | 3 +-- .../scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala index 276aafb2314..1284bfd73e6 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala @@ -67,8 +67,7 @@ class PlanOnlyStatement( statement) resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout); - case _ => - explainOperation(statement) + case _ => explainOperation(statement) } } catch { onError() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 46ca2e8f6c0..02c22e4f6a7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -172,9 +172,7 @@ class ExecuteStatement( try { val opHandle = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(opHandle) - } catch { - onError("submitting query in background, query rejected") - } + } catch onError("submitting query in background, query rejected") if (!shouldRunAsync) getBackgroundHandle.get() }