Skip to content

Commit

Permalink
[KYUUBI #6843] [FOLLOWUP] Fix 'query-timeout-thread' thread leak
Browse files Browse the repository at this point in the history
### Why are the changes needed?

If the session manager's ThreadPoolExecutor refuses to execute the asyncOperation, then we need to shut down the query-timeout-thread in the catch block. This should also be done in JDBC and the CHAT engine.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

Closes #6873 from lsm1/branch-followup-6843.

Closes #6843

aed9088 [senmiaoliu] fix query timeout checker leak in chat engine and jdbc engine

Authored-by: senmiaoliu <[email protected]>
Signed-off-by: senmiaoliu <[email protected]>
  • Loading branch information
lsm1 committed Jan 10, 2025
1 parent a051253 commit 6221901
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kyuubi.engine.chat.operation

import org.apache.kyuubi.Logging
import java.util.concurrent.RejectedExecutionException

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.chat.provider.ChatProvider
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -41,9 +43,19 @@ class ExecuteStatement(
executeStatement()
}
}
val chatSessionManager = session.sessionManager
val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
try {
val chatSessionManager = session.sessionManager
val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {
executeStatement()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.operation

import java.sql.{Connection, Statement, Types}
import java.util.concurrent.RejectedExecutionException

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
Expand Down Expand Up @@ -50,9 +51,19 @@ class ExecuteStatement(
executeStatement()
}
}
val jdbcSessionManager = session.sessionManager
val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
try {
val jdbcSessionManager = session.sessionManager
val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
shutdownTimeoutMonitor()
throw ke
}
} else {
executeStatement()
}
Expand Down

0 comments on commit 6221901

Please sign in to comment.