Skip to content

Commit

Permalink
replace from_json with get_json_object
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Jan 3, 2025
1 parent b956a72 commit a29153f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,13 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
)
}

def enableReplaceFromJsonWithGetJsonObject(): Boolean = {
SparkEnv.get.conf.getBoolean(
CHConf.runtimeConfig("enable_replace_from_json_with_get_json_object"),
defaultValue = true
)
}

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.get.enableNativeWriter.getOrElse(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object CHRuleApi {
(spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
injector.injectResolutionRule(spark => new RewriteToDateExpresstionRule(spark))
injector.injectResolutionRule(spark => new RewriteDateTimestampComparisonRule(spark))
injector.injectResolutionRule(spark => new RepalceFromJsonWithGetJsonObject(spark))
injector.injectOptimizerRule(spark => new CommonSubexpressionEliminateRule(spark))
injector.injectOptimizerRule(spark => new ExtendedColumnPruning(spark))
injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/*
* This file includes some rules to repace expressions in more efficient way.
*/

// Try to replace `from_json` with `get_json_object` if possible.
class RepalceFromJsonWithGetJsonObject(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
logError(s"xxx ${GlutenConfig.get.enableColumnarBatchScan}")
if (!CHBackendSettings.enableReplaceFromJsonWithGetJsonObject || !plan.resolved) {
plan;
} else {
visitPlan(plan)
}
}

def visitPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case project: Project =>
val newProjectList = project.projectList.map(expr => visitNamedExpression(expr))
Project(newProjectList, project.child)
case other =>
other.withNewChildren(other.children.map(visitPlan))
}
}

def visitNamedExpression(namedExpr: NamedExpression): NamedExpression = {
val expr = namedExpr.asInstanceOf[Expression]
expr.withNewChildren(expr.children.map(visitExpression)).asInstanceOf[NamedExpression]
}

def visitExpression(expr: Expression): Expression = {
expr match {
case getMapValue: GetMapValue
if getMapValue.child.isInstanceOf[JsonToStructs] &&
getMapValue.dataType.isInstanceOf[MapType] &&
getMapValue.dataType.asInstanceOf[MapType].valueType.isInstanceOf[StringType] &&
getMapValue.key.isInstanceOf[Literal] &&
getMapValue.key.dataType.isInstanceOf[StringType] =>
val child = getMapValue.child.asInstanceOf[JsonToStructs].child
val key = UTF8String.fromString(s"$$.${getMapValue.key.asInstanceOf[Literal].value}")
GetJsonObject(child, Literal(key, StringType))
case other =>
other.withNewChildren(other.children.map(visitExpression))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,4 +902,25 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
}
compareResultsAgainstVanillaSpark(sql, true, checkProjects, false)
}

test("GLUTEN-8406 replace from_json with get_json_object") {
withTable("test_8406") {
spark.sql("create table test_8406(x string) using parquet")
val insert_sql =
"""
|insert into test_8406 values
|('{"a":1}'),
|('{"a":2'),
|('{"b":3}'),
|('{"a":"5"}'),
|('{"a":{"x":1}}')
|""".stripMargin
spark.sql(insert_sql)
val sql =
"""
|select from_json(x, 'Map<String, String>')['a'] from test_8406
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
}
}

0 comments on commit a29153f

Please sign in to comment.