Skip to content

Commit

Permalink
cp (#2792)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyuhang0 authored Oct 8, 2024
1 parent a437268 commit 8f7901e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.pingcap.tikv.exception.TiInternalException
import com.pingcap.tikv.meta.TiTimestamp
import com.pingcap.tikv.util.{BackOffer, ConcreteBackOffer}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
Expand All @@ -31,7 +32,8 @@ case class ServiceSafePoint(
serviceId: String,
ttl: Long,
GCMaxWaitTime: Long,
tiSession: TiSession) {
tiSession: TiSession,
sparkSession: SparkSession) {

private final val logger = LoggerFactory.getLogger(getClass.getName)
private var minStartTs = Long.MaxValue
Expand All @@ -40,6 +42,14 @@ case class ServiceSafePoint(
service.scheduleAtFixedRate(
() => {
if (minStartTs != Long.MaxValue) {
val now = tiSession.getTimestamp
if (now.getPhysical - TiTimestamp.extractPhysical(minStartTs) >= GCMaxWaitTime * 1000) {
val msg =
s"Can not pause GC more than spark.tispark.gc_max_wait_time=$GCMaxWaitTime s. start_ts: ${minStartTs}, now: ${now.getVersion}. You can adjust spark.tispark.gc_max_wait_time to increase the gc max wait time."
logger.error(msg)
sparkSession.stop()
throw new TiInternalException(msg)
}
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
serviceId,
ttl,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/TiContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class TiContext(val sparkSession: SparkSession) extends Serializable with Loggin
"tispark_" + UUID.randomUUID,
TiConfigConst.DEFAULT_GC_SAFE_POINT_TTL,
GCMaxWaitTime,
tiSession)
tiSession,
sparkSession)

sparkSession.sparkContext.addSparkListener(new SparkListener() {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
Expand Down

0 comments on commit 8f7901e

Please sign in to comment.