Skip to content

Commit

Permalink
[SPARK-50811] Support enabling JVM profiler on driver
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jan 14, 2025
1 parent 2d498d5 commit cb0e06f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext.DRIVER_IDENTIFIER
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.PATH
import org.apache.spark.util.{ThreadUtils, Utils}


/**
* A class that enables the async JVM code profiler
*/
Expand All @@ -49,7 +49,11 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
}
private val appAttemptId = conf.getOption("spark.app.attempt.id")
private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId)
private val profileFile = s"profile-exec-$executorId.jfr"
private val profileFile = if (executorId == DRIVER_IDENTIFIER) {
s"profile-$executorId.jfr"
} else {
s"profile-exec-$executorId.jfr"
}

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,64 @@ import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.EXECUTOR_ID


/**
* Spark plugin to do JVM code profiling of executors
*/
class ExecutorProfilerPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = null
override def driverPlugin(): DriverPlugin = new JVMProfilerDriverPlugin

// No-op
override def executorPlugin(): ExecutorPlugin = new JVMProfilerExecutorPlugin
}

class JVMProfilerDriverPlugin extends DriverPlugin with Logging {

private var sparkConf: SparkConf = _
private var pluginCtx: PluginContext = _
private var profiler: ExecutorJVMProfiler = _
private var driverProfilingEnabled: Boolean = _

override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
pluginCtx = ctx
sparkConf = ctx.conf()
driverProfilingEnabled = sparkConf.get(DRIVER_PROFILING_ENABLED)
if (driverProfilingEnabled) {
logInfo("Driver starting JVM code profiling")
profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID())
profiler.start()
}

Map.empty[String, String].asJava
}

override def shutdown(): Unit = {
logInfo("Driver JVM profiler shutting down")
if (profiler != null) {
profiler.stop()
}
}
}

class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging {

private var sparkConf: SparkConf = _
private var pluginCtx: PluginContext = _
private var profiler: ExecutorJVMProfiler = _
private var codeProfilingEnabled: Boolean = _
private var codeProfilingFraction: Double = _
private var executorProfilingEnabled: Boolean = _
private var executorProfilingFraction: Double = _
private val rand: Random = new Random(System.currentTimeMillis())

override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
pluginCtx = ctx
sparkConf = ctx.conf()
codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED)
if (codeProfilingEnabled) {
codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION)
if (rand.nextInt(100) * 0.01 < codeProfilingFraction) {
executorProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED)
if (executorProfilingEnabled) {
executorProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION)
if (rand.nextInt(100) * 0.01 < executorProfilingFraction) {
logInfo(log"Executor id ${MDC(EXECUTOR_ID, pluginCtx.executorID())} " +
log"selected for JVM code profiling")
profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import org.apache.spark.internal.config.ConfigBuilder

package object profiler {

private[profiler] val DRIVER_PROFILING_ENABLED =
ConfigBuilder("spark.driver.profiling.enabled")
.doc("Turn on code profiling via async_profiler in driver.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[profiler] val EXECUTOR_PROFILING_ENABLED =
ConfigBuilder("spark.executor.profiling.enabled")
.doc("Turn on code profiling via async_profiler in executors.")
Expand Down

0 comments on commit cb0e06f

Please sign in to comment.