Skip to content

Commit

Permalink
fix app id unavailable during driver plugin init
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jan 14, 2025
1 parent cb0e06f commit d001467
Showing 1 changed file with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)

private val appId = try {
conf.getAppId
} catch {
case _: NoSuchElementException => "local-" + System.currentTimeMillis
}
private val appAttemptId = conf.getOption("spark.app.attempt.id")
private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId)
// app_id and app_attempt_id is unavailable during drvier plugin initialization
private def getAppId: Option[String] = conf.getOption("spark.app.id")
private def getAttemptId: Option[String] = conf.getOption("spark.app.attempt.id")

private val profileFile = if (executorId == DRIVER_IDENTIFIER) {
s"profile-$executorId.jfr"
} else {
Expand All @@ -63,7 +60,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
private val PROFILER_FOLDER_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
private val PROFILER_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("660", 8).toShort)
private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private var outputStream: FSDataOutputStream = _
@volatile private var outputStream: FSDataOutputStream = _
private var inputStream: InputStream = _
private val dataBuffer = new Array[Byte](UPLOAD_SIZE)
private var threadpool: ScheduledExecutorService = _
Expand Down Expand Up @@ -112,24 +109,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
}

private def startWriting(): Unit = {
profilerDfsDirOpt.foreach { profilerDfsDir =>
val profilerDirForApp = s"$profilerDfsDir/$baseName"
val profileOutputFile = s"$profilerDirForApp/$profileFile"

val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf)

requireProfilerBaseDirAsDirectory(fs, profilerDfsDir)

val profilerDirForAppPath = new Path(profilerDirForApp)
if (!fs.exists(profilerDirForAppPath)) {
// SPARK-30860: use the class method to avoid the umask causing permission issues
FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS)
}

outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS)
profilerDfsDirOpt.foreach { _ =>
try {
logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}")
inputStream = new BufferedInputStream(
new FileInputStream(s"$profilerLocalDir/$profileFile"))
threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
Expand Down Expand Up @@ -161,6 +142,31 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
if (!writing) {
return
}

if (outputStream == null) {
while (getAppId.isEmpty) {
logDebug("Waiting for Spark application started")
Thread.sleep(1000L)
}
val baseName = Utils.nameForAppAndAttempt(getAppId.get, getAttemptId)
val profilerDirForApp = s"${profilerDfsDirOpt.get}/$baseName"
val profileOutputFile = s"$profilerDirForApp/$profileFile"

val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val fs = Utils.getHadoopFileSystem(profilerDfsDirOpt.get, hadoopConf)

requireProfilerBaseDirAsDirectory(fs, profilerDfsDirOpt.get)

val profilerDirForAppPath = new Path(profilerDirForApp)
if (!fs.exists(profilerDirForAppPath)) {
// SPARK-30860: use the class method to avoid the umask causing permission issues
FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS)
}

logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}")
outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS)
}

try {
// stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss
// the events while the file is being dumped, but that is the only way to make sure that
Expand Down

0 comments on commit d001467

Please sign in to comment.