diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 0b1f7bedd07..c186633f61e 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -65,15 +65,11 @@ trait ProcBuilder { // Visible for test private[kyuubi] var logCaptureThread: Thread = _ - private lazy val engineLog: File = ProcBuilder.synchronized { + private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized { val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT) val currentTime = System.currentTimeMillis() val processLogPath = workingDir - val totalExistsFile = processLogPath.toFile.listFiles(new FilenameFilter() { - override def accept(dir: File, name: String): Boolean = { - name.startsWith(module) - } - }) + val totalExistsFile = processLogPath.toFile.listFiles { (_, name) => name.startsWith(module) } val sorted = totalExistsFile.sortBy(_.getName.split("\\.").last.toInt) val nextIndex = if (sorted.isEmpty) { 0 @@ -81,6 +77,18 @@ trait ProcBuilder { sorted.last.getName.split("\\.").last.toInt + 1 } val file = sorted.find(_.lastModified() < currentTime - engineLogTimeout) + .map { existsFile => + try { + // Here we want to overwrite the exists log file + existsFile.delete() + existsFile.createNewFile() + existsFile + } catch { + case e: Exception => + warn(s"failed to delete engine log file: ${existsFile.getAbsolutePath}", e) + null + } + } .getOrElse { Files.createDirectories(processLogPath) val newLogFile = new File(processLogPath.toFile, s"$module.log.$nextIndex") diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 2c842b1f3f8..5f2424e74f6 100644 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -18,13 +18,15 @@ package org.apache.kyuubi.engine.spark import java.io.File -import java.nio.file.{Files, Path, Paths} +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.time.Duration import java.util.concurrent.{Executors, TimeUnit} import org.scalatest.time.SpanSugar._ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_LOG_TIMEOUT import org.apache.kyuubi.service.ServiceUtils class SparkProcessBuilderSuite extends KerberizedTestHelper { @@ -179,6 +181,29 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper { atomicTest() } } + + test("overwrite log file should cleanup before write") { + val fakeWorkDir = Files.createTempDirectory("fake") + val conf = KyuubiConf() + conf.set(ENGINE_LOG_TIMEOUT, Duration.ofDays(1).toMillis) + val builder1 = new FakeSparkProcessBuilder(conf) { + override val workingDir: Path = fakeWorkDir + } + val file1 = builder1.engineLog + Files.write(file1.toPath, "a".getBytes(), StandardOpenOption.APPEND) + assert(file1.length() == 1) + Files.write(file1.toPath, "a".getBytes(), StandardOpenOption.APPEND) + assert(file1.length() == 2) + file1.setLastModified(System.currentTimeMillis() - Duration.ofDays(1).toMillis - 1000) + + val builder2 = new FakeSparkProcessBuilder(conf) { + override val workingDir: Path = fakeWorkDir + } + val file2 = builder2.engineLog + assert(file1.getAbsolutePath == file2.getAbsolutePath) + Files.write(file2.toPath, "a".getBytes(), StandardOpenOption.APPEND) + assert(file2.length() == 1) + } } class FakeSparkProcessBuilder(config: KyuubiConf)