Skip to content

Commit

Permalink
fix style and add some comment
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Dec 7, 2023
1 parent 9b7ce8d commit a369ed7
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 26 deletions.
6 changes: 3 additions & 3 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,9 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.arrow.timestampAsString | false | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 |
| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul> | string | 1.7.0 |
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.result.save.to.file | false | The switch for Spark query result save to hdfs file | boolean | 1.9.0 |
| kyuubi.operation.result.save.to.file.path | /tmp/kyuubi/tmp_kyuubi_result | The hdfs path of Spark query result save | string | 1.9.0 |
| kyuubi.operation.result.save.to.file.threshold | 209715200 | The threshold of Spark result save to hdfs file, default value is 200 MB | int | 1.9.0 |
| kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/null/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 |
| kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 |
| kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 |
| kyuubi.operation.scheduler.pool | &lt;undefined&gt; | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
engineSavePath = Some(s"$savePath/$engineId")
val path = new Path(engineSavePath.get)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_PATH, OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
Expand Down Expand Up @@ -172,12 +172,14 @@ class ExecuteStatement(
})
} else {
val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
if (hasResultSet && sparkSave && shouldSaveResultToFs(resultMaxRows, threshold, result)) {
val sessionId = session.handle.identifier.toString
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
.option("compression", "zstd").format("orc").save(saveFileName.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,9 @@ class FetchOrcStatement(spark: SparkSession) {
private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
try {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
// https://issues.apache.org/jira/browse/SPARK-34535
// SPARK-34535 changed the constructor signature of OrcDeserializer
DynConstructors.builder()
.impl(
classOf[OrcDeserializer],
classOf[StructType],
classOf[Array[Int]])
.impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
orcSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
stopSession()
}
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)}/" +
val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
info(s"Delete session result file $path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ object SparkDatasetHelper extends Logging {
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
}

def shouldSaveResultToHdfs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = {
def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
return false
}
Expand All @@ -300,7 +300,7 @@ object SparkDatasetHelper extends Logging {
} else {
result.schema.size
}
threshold > 0 && colSize > 0 && stats >= threshold
minSize > 0 && colSize > 0 && stats >= minSize
}

private def isCommandExec(nodeName: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1894,26 +1894,31 @@ object KyuubiConf {
.createWithDefault(0)

val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.result.save.to.file")
.doc("The switch for Spark query result save to hdfs file")
buildConf("kyuubi.operation.result.saveToFile.enabled")
.doc("The switch for Spark query result save to file.")
.version("1.9.0")
.booleanConf
.createWithDefault(false)

val OPERATION_RESULT_SAVE_TO_FILE_PATH: ConfigEntry[String] =
buildConf("kyuubi.operation.result.save.to.file.path")
.doc("The hdfs path of Spark query result save")
val OPERATION_RESULT_SAVE_TO_FILE_DIR: ConfigEntry[String] =
buildConf("kyuubi.operation.result.saveToFile.dir")
.doc("The Spark query result save dir, it should be a public accessible to every engine." +
" Results are saved in ORC format, and the directory structure is" +
s" `/${OPERATION_RESULT_SAVE_TO_FILE_DIR}/engineId/sessionId/statementId`." +
" Each query result will delete when query finished.")
.version("1.9.0")
.stringConf
.createWithDefault("/tmp/kyuubi/tmp_kyuubi_result")

val OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD: ConfigEntry[Int] =
buildConf("kyuubi.operation.result.save.to.file.threshold")
.doc("The threshold of Spark result save to hdfs file, default value is 200 MB")
val OPERATION_RESULT_SAVE_TO_FILE_MINSIZE: ConfigEntry[Long] =
buildConf("kyuubi.operation.result.saveToFile.minSize")
.doc("The minSize of Spark result save to file, default value is 200 MB." +
"we use spark's `EstimationUtils#getSizePerRowestimate` to estimate" +
" the output size of the execution plan.")
.version("1.9.0")
.intConf
.longConf
.checkValue(_ > 0, "must be positive value")
.createWithDefault(209715200)
.createWithDefault(200 * 1024 * 1024)

val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.incremental.collect")
Expand Down

0 comments on commit a369ed7

Please sign in to comment.