Skip to content

Commit

Permalink
[SPARK-50783][CORE] Canonicalize JVM profiler results file name and l…
Browse files Browse the repository at this point in the history
…ayout on DFS
  • Loading branch information
pan3793 committed Jan 10, 2025
1 parent 793ad7c commit 0af224a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 39 deletions.
6 changes: 3 additions & 3 deletions connector/profiler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The profiler writes the jfr files to the executor's working directory in the exe
Code profiling is currently only supported for

* Linux (x64)
* Linux (arm 64)
* Linux (arm64)
* Linux (musl, x64)
* MacOS

Expand Down Expand Up @@ -54,7 +54,7 @@ Then enable the profiling in the configuration.
<td><code>spark.executor.profiling.dfsDir</code></td>
<td>(none)</td>
<td>
An HDFS compatible path to which the profiler's output files are copied. The output files will be written as <i>dfsDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
An HDFS compatible path to which the profiler's output files are copied. The output files will be written as <i>dfsDir/{{APP_ID}}/profile-{{APP_ID}}-exec-{{EXECUTOR_ID}}.jfr</i> <br/>
If no <i>dfsDir</i> is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files.
</td>
<td>4.0.0</td>
Expand All @@ -72,7 +72,7 @@ Then enable the profiling in the configuration.
<td>event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s</td>
<td>
Options to pass to the profiler. Detailed options are documented in the comments here:
<a href="https://github.com/async-profiler/async-profiler/blob/32601bccd9e49adda9510a2ed79d142ac6ef0ff9/src/arguments.cpp#L52">Profiler arguments</a>.
<a href="https://github.com/async-profiler/async-profiler/blob/v3.0/src/arguments.cpp#L44">Profiler arguments</a>.
Note that the options to start, stop, specify output format, and output file do not have to be specified.
</td>
<td>4.0.0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package org.apache.spark.executor.profiler

import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
import java.net.URI
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.PATH
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}


/**
Expand All @@ -38,15 +38,26 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
private var running = false
private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val appId = try {
conf.getAppId
} catch {
case _: NoSuchElementException => "local-" + System.currentTimeMillis
}
private val appAttemptId = conf.getOption("spark.app.attempt.id")
private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId)
private val profileFile = s"profile-$baseName-exec-$executorId.jfr"

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile"
private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/$profileFile"
private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/$profileFile"

private val PROFILER_FOLDER_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
private val PROFILER_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("660", 8).toShort)
private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private var outputStream: FSDataOutputStream = _
private var inputStream: InputStream = _
Expand Down Expand Up @@ -89,28 +100,34 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
}
}

private def requireProfilerBaseDirAsDirectory(fs: FileSystem, profilerDfsDir: String): Unit = {
if (!fs.getFileStatus(new Path(profilerDfsDir)).isDirectory) {
throw new IllegalArgumentException(
s"Profiler DFS base directory $profilerDfsDir is not a directory.")
}
}

private def startWriting(): Unit = {
if (profilerDfsDir.isDefined) {
val applicationId = try {
conf.getAppId
} catch {
case _: NoSuchElementException => "local-" + System.currentTimeMillis
profilerDfsDirOpt.foreach { profilerDfsDir =>
val profilerDirForApp = s"$profilerDfsDir/$baseName"
val profileOutputFile = s"$profilerDirForApp/$profileFile"

val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf)

requireProfilerBaseDirAsDirectory(fs, profilerDfsDir)

val profilerDirForAppPath = new Path(profilerDirForApp)
if (!fs.exists(profilerDirForAppPath)) {
// SPARK-30860: use the class method to avoid the umask causing permission issues
FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS)
}
val config = SparkHadoopUtil.get.newConfiguration(conf)
val appName = conf.get("spark.app.name").replace(" ", "-")
val profilerOutputDirname = profilerDfsDir.get

val profileOutputFile =
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
val fs = FileSystem.get(new URI(profileOutputFile), config);
val filenamePath = new Path(profileOutputFile)
outputStream = fs.create(filenamePath)

outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS)
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true)
}
logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}")
inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr"))
inputStream = new BufferedInputStream(
new FileInputStream(s"$profilerLocalDir/$profileFile"))
threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
threadpool.scheduleWithFixedDelay(
new Runnable() {
Expand Down Expand Up @@ -158,14 +175,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
} catch {
case e: IOException => logError("Exception occurred while writing some profiler output: ", e)
case e @ (_: IllegalArgumentException | _: IllegalStateException) =>
logError("Some profiler output not written." +
" Exception occurred in profiler native code: ", e)
logError("Some profiler output not written. " +
"Exception occurred in profiler native code: ", e)
case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e)
}
}

private def finishWriting(): Unit = {
if (profilerDfsDir.isDefined && writing) {
if (profilerDfsDirOpt.isDefined && writing) {
try {
// shutdown background writer
threadpool.shutdown()
Expand All @@ -177,8 +194,8 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case e: IOException =>
logWarning("Some profiling output not written." +
"Exception occurred while completing profiler output", e)
logWarning("Some profiling output not written. " +
"Exception occurred while completing profiler output: ", e)
}
writing = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,7 @@ object EventLogFileWriter {
}

def nameForAppAndAttempt(appId: String, appAttemptId: Option[String]): String = {
val base = Utils.sanitizeDirName(appId)
if (appAttemptId.isDefined) {
base + "_" + Utils.sanitizeDirName(appAttemptId.get)
} else {
base
}
Utils.nameForAppAndAttempt(appId, appAttemptId)
}

def codecName(log: Path): Option[String] = {
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2954,6 +2954,15 @@ private[spark] object Utils
str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase(Locale.ROOT)
}

def nameForAppAndAttempt(appId: String, appAttemptId: Option[String]): String = {
val base = sanitizeDirName(appId)
if (appAttemptId.isDefined) {
base + "_" + sanitizeDirName(appAttemptId.get)
} else {
base
}
}

def isClientMode(conf: SparkConf): Boolean = {
"client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
}
Expand Down

0 comments on commit 0af224a

Please sign in to comment.