From 9b7ce8dbafc98dc2ea494a56dcef40ae6618893a Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Thu, 30 Nov 2023 14:59:31 +0800 Subject: [PATCH] Close orc fetchOrcStatement and remove result save file when ExecuteStatement close --- .../spark/operation/ExecuteStatement.scala | 23 +++++++--- .../spark/operation/FetchOrcStatement.scala | 45 +++++++++++++++---- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 12 +++-- .../org/apache/kyuubi/config/KyuubiConf.scala | 1 + 4 files changed, 61 insertions(+), 20 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index d7abcc08354..3bfd4a1c87e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -22,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException import scala.Array._ import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path import org.apache.spark.sql.DataFrame import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ import org.apache.spark.sql.types._ @@ -47,6 +48,8 @@ class ExecuteStatement( override def getOperationLog: Option[OperationLog] = Option(operationLog) override protected def supportProgress: Boolean = true + private var fetchOrcStatement: Option[FetchOrcStatement] = None + private var saveFileName: Option[String] = None override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { new StructType().add("Result", "string") @@ -65,6 +68,15 @@ class ExecuteStatement( OperationLog.removeCurrentOperationLog() } + override def close(): Unit = { + super.close() + fetchOrcStatement.foreach(_.close()) + saveFileName.foreach { p => + val path = new Path(p) + path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true) + } + } + protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = { resultDF.toLocalIterator().asScala } @@ -164,17 +176,18 @@ class ExecuteStatement( if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) { val sessionId = session.handle.identifier.toString val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH) - val fileName = s"$savePath/$engineId/$sessionId/$statementId" + saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId") val colName = range(0, result.schema.size).map(x => "col" + x) if (resultMaxRows > 0) { result.toDF(colName: _*).limit(resultMaxRows).write - .option("compression", "zstd").format("orc").save(fileName) + .option("compression", "zstd").format("orc").save(saveFileName.get) } else { result.toDF(colName: _*).write - .option("compression", "zstd").format("orc").save(fileName) + .option("compression", "zstd").format("orc").save(saveFileName.get) } - info(s"Save result to $fileName") - return new FetchOrcStatement(spark).getIterator(fileName, resultSchema) + info(s"Save result to $saveFileName") + fetchOrcStatement = Some(new FetchOrcStatement(spark)) + return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema) } val internalArray = if (resultMaxRows <= 0) { info("Execute in full collect mode") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala index 9f28fc22952..4d00c5b0cb8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala @@ -36,9 +36,13 @@ import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer import org.apache.spark.sql.types.StructType import org.apache.kyuubi.KyuubiException +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator} +import org.apache.kyuubi.util.reflect.DynConstructors class FetchOrcStatement(spark: SparkSession) { + + var orcIter: OrcFileIterator = _ def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = { val conf = spark.sparkContext.hadoopConfiguration val savePath = new Path(path) @@ -59,21 +63,42 @@ class FetchOrcStatement(spark: SparkSession) { AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) val deserializer = getOrcDeserializer(orcSchema, colId) - val iter = new OrcFileIterator(list) - val iterRow = iter.map(value => + orcIter = new OrcFileIterator(list) + val iterRow = orcIter.map(value => unsafeProjection(deserializer.deserialize(value))) .map(value => toRowConverter(value)) new IterableFetchIterator[Row](iterRow.toIterable) } + def close(): Unit = { + orcIter.close() + } + private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = { try { - val cls = Class.forName("org.apache.spark.sql.execution.datasources.orc.OrcDeserializer") - val constructor = cls.getDeclaredConstructors.apply(0) - if (constructor.getParameterCount == 3) { - constructor.newInstance(new StructType, orcSchema, colId).asInstanceOf[OrcDeserializer] + if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") { + // https://issues.apache.org/jira/browse/SPARK-34535 + DynConstructors.builder() + .impl( + classOf[OrcDeserializer], + classOf[StructType], + classOf[Array[Int]]) + .build[OrcDeserializer]() + .newInstance( + orcSchema, + colId) } else { - constructor.newInstance(orcSchema, colId).asInstanceOf[OrcDeserializer] + DynConstructors.builder() + .impl( + classOf[OrcDeserializer], + classOf[StructType], + classOf[StructType], + classOf[Array[Int]]) + .build[OrcDeserializer]() + .newInstance( + new StructType, + orcSchema, + colId) } } catch { case e: Throwable => @@ -84,7 +109,7 @@ class FetchOrcStatement(spark: SparkSession) { class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] { - val iters = fileList.map(x => getOrcFileIterator(x)) + private val iters = fileList.map(x => getOrcFileIterator(x)) var idx = 0 @@ -106,6 +131,10 @@ class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[ } } + def close(): Unit = { + iters.foreach(_.close()) + } + private def getOrcFileIterator(file: LocatedFileStatus): RecordReaderIterator[OrcStruct] = { val orcRecordReader = { val split = diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 3df2c873a9c..06f8501c177 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SortExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -281,8 +281,10 @@ object SparkDatasetHelper extends Logging { } def shouldSaveResultToHdfs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = { + if (isCommandExec(result.queryExecution.executedPlan.nodeName)) { + return false + } lazy val limit = result.queryExecution.executedPlan match { - case plan if isCommandExec(plan.nodeName) => 0 case collectLimit: CollectLimitExec => collectLimit.limit case _ => resultMaxRows } @@ -292,17 +294,13 @@ object SparkDatasetHelper extends Logging { } else { result.queryExecution.optimizedPlan.stats.sizeInBytes } - lazy val isSort = result.queryExecution.sparkPlan match { - case s: SortExec => s.global - case _ => false - } lazy val colSize = if (result == null || result.schema.isEmpty) { 0 } else { result.schema.size } - threshold > 0 && colSize > 0 && !isSort && stats >= threshold + threshold > 0 && colSize > 0 && stats >= threshold } private def isCommandExec(nodeName: String): Boolean = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index c035bc168df..8093e64e26d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1912,6 +1912,7 @@ object KyuubiConf { .doc("The threshold of Spark result save to hdfs file, default value is 200 MB") .version("1.9.0") .intConf + .checkValue(_ > 0, "must be positive value") .createWithDefault(209715200) val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =