Skip to content

Commit

Permalink
update documents
Browse files Browse the repository at this point in the history
  • Loading branch information
wankunde committed Mar 17, 2021
1 parent 7b4bcba commit d0f2973
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 39 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ SELECT /*+ COLLECT_VALUE('row_count2', 'd') */ count(1) as d;

# 参数管理

系统执行过程中会有很多运行以来参数,包括时间参数, 系统参数和Set命令参数
系统执行过程中会有很多运行以来参数,包括时间参数, 系统参数和Set命令参数。
系统通过set命令,apollo配置等方式进行参数定义,在程序中使用`${variable}`的格式引用参数。
通过 `${variable, 'DEFAULT_VALUE'}`格式引用参数时,如果没有找到`variable`参数,则返回`DEFAULT_VALUE`

## 时间参数

Expand Down
27 changes: 27 additions & 0 deletions src/main/scala/org/apache/spark/sql/util/StringUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019 Leyantech Ltd. All Rights Reserved.
package org.apache.spark.sql.util

import org.apache.commons.lang3.StringUtils

/**
* @author kun.wan, <[email protected]>
* @date 2021-03-17.
*/
object StringUtil {

val escapeMapping: Map[Array[Char], Array[Char]] = Map(
Array('\"') -> Array('\"'),
Array(''') -> Array('''),
Array('(') -> Array(')'),
)

def escapeStringValue(text: String): String = {
var res = text.trim
for ((startChars, endChars) <- escapeMapping
if res.startsWith(new String(startChars)) && res.endsWith(new String(endChars))) {
res = StringUtils.removeStart(res, new String(startChars))
res = StringUtils.removeEnd(res, new String(endChars)).trim
}
res
}
}
10 changes: 3 additions & 7 deletions src/main/scala/org/apache/sql/runner/command/BaseCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package org.apache.sql.runner.command

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.util.Logging
import org.apache.spark.sql.util.{Logging, StringUtil}

/**
* @author kun.wan, <[email protected]>
Expand Down Expand Up @@ -49,12 +49,8 @@ abstract class BaseCommand(sourceChars: SourceChars) extends Logging {
}
}
assert(index >= 0, s"Parse Job Error!\n${new String(chars.slice(sourceChars.start, sourceChars.end))}")
var res = new String(chars.slice(sourceChars.start, index)).trim
for ((startChars, endChars) <- escapeMapping
if res.startsWith(new String(startChars)) && res.endsWith(new String(endChars))) {
res = StringUtils.removeStart(res, new String(startChars))
res = StringUtils.removeEnd(res, new String(endChars)).trim
}
val res =
StringUtil.escapeStringValue(new String(chars.slice(sourceChars.start, index)))
val nextStart = i + 1
(res, index, nextStart)
}
Expand Down
11 changes: 10 additions & 1 deletion src/main/scala/org/apache/sql/runner/command/SetCommand.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019 Leyantech Ltd. All Rights Reserved.
package org.apache.sql.runner.command

import org.apache.sql.runner.config.VariableSubstitution
import org.apache.sql.runner.container.ConfigContainer

/**
Expand All @@ -17,7 +18,15 @@ case class SetCommand(sourceChars: SourceChars) extends BaseCommand(sourceChars)

val (key, _, valueStart) = readTo('=')
sourceChars.start = valueStart
val (value, _, nextStart) = readTo(';')
val (value, _, nextStart) =
readTo(';') match {
case (value, index, nextStart) =>
val substitutionValue =
VariableSubstitution.withSubstitution { substitution =>
substitution.substitute(value)
}
(substitutionValue, index, nextStart)
}
sourceChars.start = nextStart

override def toString: String = s"${CommandFactory.setPrefix} $key = $value;"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ package org.apache.sql.runner.config
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.apache.spark.sql.util.{Logging, SystemVariables}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.util.{Logging, StringUtil, SystemVariables}
import org.apache.sql.runner.container.{CollectorContainer, ConfigContainer}
import org.apache.sql.runner.container.ConfigContainer.valueMap

Expand Down Expand Up @@ -83,55 +84,80 @@ class VariableSubstitution extends Logging {
val lastHour = dateParameter("${date - 1h |yyyyMMddHH}")
val nextHour = dateParameter("${date + 1h |yyyyMMddHH}")

def substitute(sqlText: String): String = {
def substitute(content: String): String = {
var tup = doSubstitute(content)
var retryNum = 1
while (tup._2 && retryNum < 1000) {
tup = doSubstitute(tup._1)
retryNum = retryNum + 1
}
tup._1
}

def doSubstitute(content: String): (String, Boolean) = {
val parsedSqlText = new StringBuilder()
var flag = true
var start = -1
var end = -1
var findParameter = ""
for (pair <- sqlText.zipWithIndex if flag) {
if (pair._1 == '{' && pair._2 > 0 && sqlText.charAt(pair._2 - 1) == '$') {
for (pair <- content.zipWithIndex if flag) {
if (pair._1 == '{' && pair._2 > 0 && content.charAt(pair._2 - 1) == '$') {
start = pair._2
} else if (start > 0 && pair._1 == '}') {
end = pair._2
flag = false
findParameter = sqlText.substring(start + 1, pair._2)
findParameter = content.substring(start + 1, pair._2)
}
}
if ("" != findParameter) {
parsedSqlText.append(sqlText.substring(0, start - 1))
parsedSqlText.append(getParameterValue(findParameter))
parsedSqlText.append(sqlText.substring(end + 1))
substitute(parsedSqlText.toString())
val commaPos = findParameter.indexOf(',')
val (paramKey, defaultValue) =
if (commaPos > 0) {
val (keyToken, valueToken) =
(findParameter.take(commaPos), findParameter.drop(commaPos + 1))
(StringUtil.escapeStringValue(keyToken), Some(StringUtil.escapeStringValue(valueToken)))
} else {
(StringUtil.escapeStringValue(findParameter), None)
}

parsedSqlText.append(content.substring(0, start - 1))
parsedSqlText.append(getParameterValue(paramKey, defaultValue))
parsedSqlText.append(content.substring(end + 1))
(parsedSqlText.toString(), true)
} else {
sqlText
(content, false)
}
}

private def getParameterValue(parameter: String): String = {
parameter match {
private def getParameterValue(paramKey: String,
defaultValue: Option[String] = None): String = {
paramKey.trim match {
case "dt" => dt
case "yesterday" => yesterday
case "tomorrow" => tomorrow
case "hour" => hour
case "lastHour" => lastHour
case "nextHour" => nextHour
case datePattern: String if (parameter.trim.startsWith("date")) =>
case datePattern: String if (paramKey.startsWith("date")) =>
try {
dateParameter(s"$${$datePattern}")
} catch {
case ex: Exception =>
throw new Exception(s"parameter $parameter cannot be parsed", ex)
throw new Exception(s"parameter $paramKey cannot be parsed", ex)
}
case _ => ConfigContainer.getOrElse(parameter,
throw new Exception(s"parameter $parameter cannot be parsed"))
case _ =>
ConfigContainer.getOrElse(paramKey,
defaultValue.getOrElse(
throw new Exception(s"parameter $paramKey cannot be parsed")
)
)
}
}
}

object VariableSubstitution {

def withSubstitution(body: VariableSubstitution => Unit): Unit = {
def withSubstitution[T](body: VariableSubstitution => T): T = {
val substitution = new VariableSubstitution
val originConfigMap = valueMap.get()
val newConfigMap = originConfigMap.map {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2019 Leyantech Ltd. All Rights Reserved.
package org.apache.spark.sql.optimizer

import java.time.ZoneOffset

import org.apache.spark.sql.InsightSuiteUtils.cleanTestHiveData
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.SparkSqlRunner
Expand Down Expand Up @@ -61,6 +63,77 @@ class ExternalRelationRuleSuite extends QueryTest with SQLTestUtils with Matcher
super.afterAll()
}

test("sdf") {
import spark.implicits._
import scala.util.Random
import scala.math.BigDecimal
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.sql.Timestamp

spark.range(-1000, 1000).map { id =>
val dt = Timestamp.valueOf(LocalDateTime.now.plusDays(id))
val dt_str = LocalDateTime.now.plusDays(id).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
val dt_int = dt_str.toInt
(id.toInt, dt, dt_str, dt_int)
}.toDF("id", "dt", "dt_str", "dt_int")
.createOrReplaceTempView("tab")

spark.sql("SELECT * FROM (SELECT current_date(), id, row_number() OVER(PARTITION BY id %10 ORDER BY id ASC) rn from tab) t where rn < 5").show(false)
spark.sql("explain extended SELECT * FROM (SELECT current_date(), id, id % 10 rn from tab) t where rn < 5").show(false)

spark.sql(
s"""create table dim_date (
| id int,
| dt timestamp,
| dt_str string,
| dt_int int
|)
|stored as parquet;
|""".stripMargin)

spark.sql("insert overwrite table dim_date select * from tab")

spark.sql("select * from dim_date ").show(200, false)


spark.sql("select * from where dt = 20201116 limit 10").show()
spark.sql(s"""select *
|from tb_bot_recommend.dws_recommend_detail t1
|join (
| select dt_str
| from dim_date
| where dt_int = 20201116
|) t2
|on t1.dt = t2.dt_str
|limit 1000
|""".stripMargin).show(10, false)


spark.range(1, 1000).map { id =>
val id2 = id + Random.nextInt(10) - 5
val id3 = BigDecimal((id * 100 + Random.nextInt(100)) / 100.0)
val name = s"wankun_$id"
val isMan = id % 2 == 0
val birthday = Timestamp.valueOf(LocalDateTime.now.plusDays(id))
(id.toInt, id2, id3, name, isMan, birthday)
}.toDF("id", "id2", "id3", "name", "isMan", "birthday")
.createOrReplaceTempView("tab2")
spark.sql(
s"""create table tab (
| id1 int,
| id2 bigint,
| id3 decimal,
| name string,
| isMan boolean,
| birthday timestamp
|)
|stored as parquet;
|""".stripMargin)
spark.sql("insert overwrite table tab select * from tab2")
spark.sql("select * from tab where id1 =4 limit 10").show()
}

test("add external relation using hint") {

val sqlText =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.scalatest.{FunSuite, Matchers}
class VariableSubstitutionSuite extends FunSuite with Matchers {

test("test time variable") {

CollectorContainer :+ (SystemVariables.BATCH_TIME -> LocalDateTime.parse("2019-08-07T13:25:41"))
val substitution = new VariableSubstitution()

Expand Down Expand Up @@ -50,8 +49,9 @@ class VariableSubstitutionSuite extends FunSuite with Matchers {
test("test variable substitution in sql") {
ConfigContainer :+ ("ab_target" -> "after_trade")
CollectorContainer :+ (SystemVariables.BATCH_TIME -> LocalDateTime.parse("2019-08-07T13:25:41"))
val variableSubstitution = new VariableSubstitution()
val sqlText =
val substitution = new VariableSubstitution()

substitution.substitute(
"""
|SELECT count(1)
|FROM tab
Expand All @@ -61,9 +61,7 @@ class VariableSubstitutionSuite extends FunSuite with Matchers {
|AND end_hour = '${date - 24h|hh}'
|AND month = '${date - 24h|MM}'
|AND ab_target = '${ab_target}'
|""".stripMargin
val newSqlText = variableSubstitution.substitute(sqlText)
val expectSqlText =
|""".stripMargin) should equal(
s"""
|SELECT count(1)
|FROM tab
Expand All @@ -73,17 +71,23 @@ class VariableSubstitutionSuite extends FunSuite with Matchers {
|AND end_hour = '01'
|AND month = '08'
|AND ab_target = 'after_trade'
|""".stripMargin
newSqlText should equal(expectSqlText)
|""".stripMargin)
}

test("test nested variable substitution in sql") {
ConfigContainer :+ ("report_days" -> "3")
CollectorContainer :+ (SystemVariables.BATCH_TIME -> LocalDateTime.parse("2019-08-07T13:25:41"))
val variableSubstitution = new VariableSubstitution()
val sqlText = "SELECT * FROM tab WHERE dt = ${date-${report_days}d|yyyyMMdd}"
val newSqlText = variableSubstitution.substitute(sqlText)
val expectSqlText = "SELECT * FROM tab WHERE dt = 20190804"
newSqlText should equal(expectSqlText)
val substitution = new VariableSubstitution()
substitution.substitute("SELECT * FROM tab WHERE dt = ${date-${report_days}d|yyyyMMdd}") should
equal("SELECT * FROM tab WHERE dt = 20190804")
}

test("test parameters with default value") {
val substitution = new VariableSubstitution()
substitution.substitute("!set key1 = ${key1, 'DEFAULT_VALUE1'};") should
equal("!set key1 = DEFAULT_VALUE1;")

substitution.substitute("!set key1 = ${key1, \"DEFAULT_VALUE1\"};") should
equal("!set key1 = DEFAULT_VALUE1;")
}
}

0 comments on commit d0f2973

Please sign in to comment.