Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DP-938][DP-939][DP-943][DP-944][DP-945][DP-946][DP-947][DP-948][DP-951][DP-954] Put back Rename Column Logic With Truncation #3

Merged
merged 7 commits into from
Oct 18, 2024
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import xerial.sbt.Sonatype._
*/

// update this version when picking up a new Flame release
val aiqSparkVersion = "3-3-2-aiq113"
val aiqSparkVersion = "3-3-2-aiq114"

val sparkVersion = aiqSparkVersion.substring(0,5).replace("-", ".")
val scalaVersionStr = "2.12.15"
Expand All @@ -20,7 +20,7 @@ val jacksonDatabindVersion = sparkVersion match {
}

// increment this version when making a new release
val sparkConnectorVersion = "4.1.8-aiq1"
val sparkConnectorVersion = "4.1.8-aiq2"

lazy val root = project
.withId("singlestore-spark-connector")
Expand Down
28 changes: 14 additions & 14 deletions src/main/scala/com/singlestore/spark/SQLGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,14 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers {

case class Raw(override val sql: String) extends SQLChunk

case class Ident(name: String, qualifier: Option[String] = None) extends SQLChunk {
override val sql: String = qualifier
.map(q => s"${SinglestoreDialect.quoteIdentifier(q)}.")
.getOrElse("") + SinglestoreDialect.quoteIdentifier(name)
case class Ident(name: String) extends SQLChunk {
override val sql: String = SinglestoreDialect.quoteIdentifier(name)
// it's not clear that we ever need to fully-qualify references since we do field renames with expr-ids
// If this changes then you can change this code to something like this:
// (and grab the qualifier when creating Ident)
// qualifier
// .map(q => s"${SinglestoreDialect.quoteIdentifier(q)}.")
// .getOrElse("") + SinglestoreDialect.quoteIdentifier(name)
}

case class Relation(
Expand All @@ -156,7 +160,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers {
val isFinal = reader.isFinal

val output = rawOutput.map(
a => AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId, Seq[String](name))
a => AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId)
)

override val sql: String = {
Expand Down Expand Up @@ -254,7 +258,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers {
case class Attr(a: Attribute, context: SQLGenContext) extends Chunk {
override def toSQL(fieldMap: Map[ExprId, Attribute]): String = {
val target = fieldMap.getOrElse(a.exprId, a)
context.ident(target.name, target.qualifier.headOption)
context.ident(target.name, target.exprId)
}
}

Expand Down Expand Up @@ -289,7 +293,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers {
def block(j: Joinable): Statement = Raw("(") + j + ")"

def alias(j: Joinable, n: String, e: ExprId, context: SQLGenContext): Statement =
block(j) + "AS" + context.ident(n, None)
block(j) + "AS" + context.ident(n, e)

def func(n: String, j: Joinable): Statement = Raw(n) + block(j)
def func(n: String, j: Joinable*): Statement = Raw(n) + block(j.reduce(_ + "," + _))
Expand Down Expand Up @@ -539,12 +543,10 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers {

def ident(name: String, exprId: ExprId): String =
if (normalizedExprIdMap.contains(exprId)) {
Ident(s"${name.substring(0, Math.min(name.length, 10))}#${normalizedExprIdMap(exprId)}").sql
Ident(s"c#${normalizedExprIdMap(exprId)}").sql
} else {
Ident(s"${name.substring(0, Math.min(name.length, 10))}#${exprId.id}").sql
Ident(s"c#${exprId.id}").sql
}

def ident(name: String, qualifier: Option[String]): String = Ident(name, qualifier).sql
}

object SQLGenContext {
Expand Down Expand Up @@ -647,9 +649,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers {
if (args.lengthCompare(1) > 0) {
val expressionNames = new mutable.HashSet[String]()
val hasDuplicates = args.exists({
case a @ NamedExpression(name, _) =>
// !expressionNames.add(s"${name}#${a.exprId.id}")
!expressionNames.add(context.ident(name, a.qualifier.headOption))
case a @ NamedExpression(name, _) => !expressionNames.add(s"$name#${a.exprId.id}")
case _ => false
})
if (hasDuplicates) return None
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/com/singlestore/spark/SQLPushdownRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SQLPushdownRule(sparkContext: SparkContext)

// We first need to set a SQLGenContext in every reader.
// This transform is done to ensure that we will generate the same aliases in the same queries.
var ptr, nextPtr = root.transform({
val normalized = root.transform({
case SQLGen.Relation(relation) =>
relation.toLogicalPlan(
relation.output,
Expand All @@ -42,21 +42,22 @@ class SQLPushdownRule(sparkContext: SparkContext)
)
})

// In the following lines we used to create Projections with renamed columns for the every query in the Plan.
// In the following lines we create Projections with renamed columns for the every query in the Plan.
// These Projections are equivalent to 'select `a` as `a#` ...'.
// If the Warehouse Tables have less than 50-100 Columns that is fine because the final SQL
// query string is not too long.
// Per our Customers Data Models, we need to account for cases where Warehouse
// Tables have more than 50-100 Columns (queries can get to ~130k characters).
// Hence, we comment out the lines below and use fully qualified names for columns in the Plan.
//
// https://github.com/memsql/singlestore-spark-connector/issues/93
//
// Note: We cannot change the following lines without breaking Parallel Read for the Connector

// Second, we need to rename the outputs of each SingleStore relation in the tree. This transform is
// done to ensure that we can handle projections which involve ambiguous column name references.
// var ptr, nextPtr = normalized.transform({
// case SQLGen.Relation(relation) => relation.renameOutput
// })
var ptr, nextPtr = normalized.transform({
case SQLGen.Relation(relation) => relation.renameOutput
})

val expressionExtractor = ExpressionExtractor(context)
val transforms =
Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/com/singlestore/spark/SinglestoreReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ case class SinglestoreReader(query: String,
with SQLPlan
with DataSourceTelemetryHelpers {

override lazy val schema: StructType = JdbcHelpers.loadSchema(options, query, variables)
private def truncatedQuery: String = {
// Truncating with one line comments produces not valid SQL
if (!log.isTraceEnabled()) {
query.stripMargin.linesIterator.map(_.trim).mkString(" ")
} else { query }
}

override lazy val schema: StructType = JdbcHelpers.loadSchema(options, truncatedQuery, variables)

override def sql: String = {
val variablesWithIndex = variables.zipWithIndex
Expand All @@ -93,15 +100,15 @@ case class SinglestoreReader(query: String,
val randHex = Random.nextInt().toHexString
val rdd =
SinglestoreRDD(
query,
truncatedQuery,
variables,
options,
schema,
expectedOutput,
resultMustBeSorted,
expectedOutput
.filter(attr => options.parallelReadRepartitionColumns.contains(attr.name))
.map(attr => context.ident(attr.name, attr.qualifier.headOption)),
.map(attr => context.ident(attr.name, attr.exprId)),
sqlContext.sparkContext,
randHex,
DataSourceTelemetryHelpers.createDataSourceTelemetry(
Expand Down Expand Up @@ -141,17 +148,17 @@ case class SinglestoreReader(query: String,
case (Some(p), Some(f)) =>
SQLGen
.select(p)
.from(SQLGen.Relation(rawColumns, this, context.nextAlias(), null))
.from(SQLGen.Relation(Nil, this, context.nextAlias(), null))
.where(f)
.output(rawColumns)
case (Some(p), None) =>
SQLGen
.select(p)
.from(SQLGen.Relation(rawColumns, this, context.nextAlias(), null))
.from(SQLGen.Relation(Nil, this, context.nextAlias(), null))
.output(rawColumns)
case (None, Some(f)) =>
SQLGen.selectAll
.from(SQLGen.Relation(expectedOutput, this, context.nextAlias(), null))
.from(SQLGen.Relation(Nil, this, context.nextAlias(), null))
.where(f)
.output(expectedOutput)
case _ =>
Expand Down
35 changes: 29 additions & 6 deletions src/test/scala/com/singlestore/spark/SQLPushdownTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1929,12 +1929,8 @@ class SQLPushdownTest extends IntegrationSuiteBase with BeforeAndAfterEach with
"select * from users full outer join reviews on users.id = reviews.user_id")
}
it("natural join") {
val thrown = intercept[SQLSyntaxErrorException] {
testSingleReadForReadFromLeaves(
"select users.id, rating from users natural join (select user_id as id, rating from reviews)"
)
}
assert(thrown.getMessage.contains("Duplicate column name"))
testSingleReadForReadFromLeaves(
"select users.id, rating from users natural join (select user_id as id, rating from reviews)")
}
it("complex join") {
testSingleReadForReadFromLeaves(
Expand Down Expand Up @@ -2332,6 +2328,33 @@ class SQLPushdownTest extends IntegrationSuiteBase with BeforeAndAfterEach with
}
}

describe("same-name column selection") {
it("join two tables which project the same column name") {
testOrderedQuery(
"select * from (select id from users) as a, (select id from movies) as b where a.id = b.id order by a.id")
}
it("select same columns twice via natural join") {
testOrderedQuery("select * from users as a natural join users order by a.id")
}
it("select same column twice from table") {
testQuery("select first_name, first_name from users", expectPartialPushdown = true)
}
it("select same column twice from table with aliases") {
testOrderedQuery("select first_name as a, first_name as a from users order by id")
}
it("select same alias twice (different column) from table") {
testOrderedQuery("select first_name as a, last_name as a from users order by id")
}
it("select same column twice in subquery") {
testQuery("select * from (select first_name, first_name from users) as x",
expectPartialPushdown = true)
}
it("select same column twice from subquery with aliases") {
testOrderedQuery(
"select * from (select first_name as a, first_name as a from users order by id) as x")
}
}

describe("datetimeExpressions") {
describe("DateAdd") {
it("positive num_days") { testQuery("select date_add(birthday, age) from users") }
Expand Down
73 changes: 0 additions & 73 deletions src/test/scala/com/singlestore/spark/SQLPushdownTestAiq.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1761,77 +1761,4 @@ class SQLPushdownTestAiq extends IntegrationSuiteBase with BeforeAndAfterEach wi
}
}
}

describe("Same-name Column Selection") {
it("join two tables which project the same column name") {
val thrown = intercept[SQLSyntaxErrorException] {
testOrderedQuery(
"""
|select
| *
|from
| (select id from users) as a,
| (select id from movies) as b
|where a.id = b.id
|order by a.id
|""".stripMargin.linesIterator.map(_.trim).mkString(" ")
)
}
assert(thrown.getMessage.contains("Duplicate column name"))
}
it("select same columns twice via natural join") {
val thrown = intercept[SQLSyntaxErrorException] {
testOrderedQuery("select * from users as a natural join users order by a.id")
}
assert(thrown.getMessage.contains("Duplicate column name"))
}
it("select same column twice from table") {
testQuery("select first_name, first_name from users", expectPartialPushdown = true)
}
it("select same column twice from table with aliases") {
val thrown = intercept[AnalysisException] {
testQuery(
"select first_name as a, first_name as a from users",
expectPartialPushdown = true
)
}
assert(
thrown.getMessage.contains(
"Column 'a' does not exist. Did you mean one of the following? [a, a];"
)
)
}
it("select same alias twice (different column) from table") {
val thrown = intercept[AnalysisException] {
testQuery(
"select first_name as a, last_name as a from users",
expectPartialPushdown = true
)
}
assert(
thrown.getMessage.contains(
"Column 'a' does not exist. Did you mean one of the following? [a, a];"
)
)
}
it("select same column twice in subquery") {
testQuery(
"select * from (select first_name, first_name from users) as x",
expectPartialPushdown = true
)
}
it("select same column twice from subquery with aliases") {
val thrown = intercept[AnalysisException] {
testQuery(
"select * from (select first_name as a, first_name as a from users) as x",
expectPartialPushdown = true
)
}
assert(
thrown.getMessage.contains(
"Column 'a' does not exist. Did you mean one of the following? [x.a, x.a];"
)
)
}
}
}