diff --git a/build.sbt b/build.sbt index c65f314a..f9fbc00a 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ import xerial.sbt.Sonatype._ */ // update this version when picking up a new Flame release -val aiqSparkVersion = "3-3-2-aiq113" +val aiqSparkVersion = "3-3-2-aiq114" val sparkVersion = aiqSparkVersion.substring(0,5).replace("-", ".") val scalaVersionStr = "2.12.15" @@ -20,7 +20,7 @@ val jacksonDatabindVersion = sparkVersion match { } // increment this version when making a new release -val sparkConnectorVersion = "4.1.8-aiq1" +val sparkConnectorVersion = "4.1.8-aiq2" lazy val root = project .withId("singlestore-spark-connector") diff --git a/src/main/scala/com/singlestore/spark/SQLGen.scala b/src/main/scala/com/singlestore/spark/SQLGen.scala index de4d600e..3f000c33 100644 --- a/src/main/scala/com/singlestore/spark/SQLGen.scala +++ b/src/main/scala/com/singlestore/spark/SQLGen.scala @@ -136,10 +136,14 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers { case class Raw(override val sql: String) extends SQLChunk - case class Ident(name: String, qualifier: Option[String] = None) extends SQLChunk { - override val sql: String = qualifier - .map(q => s"${SinglestoreDialect.quoteIdentifier(q)}.") - .getOrElse("") + SinglestoreDialect.quoteIdentifier(name) + case class Ident(name: String) extends SQLChunk { + override val sql: String = SinglestoreDialect.quoteIdentifier(name) + // it's not clear that we ever need to fully-qualify references since we do field renames with expr-ids + // If this changes then you can change this code to something like this: + // (and grab the qualifier when creating Ident) + // qualifier + // .map(q => s"${SinglestoreDialect.quoteIdentifier(q)}.") + // .getOrElse("") + SinglestoreDialect.quoteIdentifier(name) } case class Relation( @@ -156,7 +160,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers { val isFinal = reader.isFinal val output = rawOutput.map( - a => AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId, Seq[String](name)) + a => AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId) ) override val sql: String = { @@ -254,7 +258,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers { case class Attr(a: Attribute, context: SQLGenContext) extends Chunk { override def toSQL(fieldMap: Map[ExprId, Attribute]): String = { val target = fieldMap.getOrElse(a.exprId, a) - context.ident(target.name, target.qualifier.headOption) + context.ident(target.name, target.exprId) } } @@ -289,7 +293,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers { def block(j: Joinable): Statement = Raw("(") + j + ")" def alias(j: Joinable, n: String, e: ExprId, context: SQLGenContext): Statement = - block(j) + "AS" + context.ident(n, None) + block(j) + "AS" + context.ident(n, e) def func(n: String, j: Joinable): Statement = Raw(n) + block(j) def func(n: String, j: Joinable*): Statement = Raw(n) + block(j.reduce(_ + "," + _)) @@ -539,12 +543,10 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers { def ident(name: String, exprId: ExprId): String = if (normalizedExprIdMap.contains(exprId)) { - Ident(s"${name.substring(0, Math.min(name.length, 10))}#${normalizedExprIdMap(exprId)}").sql + Ident(s"c#${normalizedExprIdMap(exprId)}").sql } else { - Ident(s"${name.substring(0, Math.min(name.length, 10))}#${exprId.id}").sql + Ident(s"c#${exprId.id}").sql } - - def ident(name: String, qualifier: Option[String]): String = Ident(name, qualifier).sql } object SQLGenContext { @@ -647,9 +649,7 @@ object SQLGen extends LazyLogging with DataSourceTelemetryHelpers { if (args.lengthCompare(1) > 0) { val expressionNames = new mutable.HashSet[String]() val hasDuplicates = args.exists({ - case a @ NamedExpression(name, _) => - // !expressionNames.add(s"${name}#${a.exprId.id}") - !expressionNames.add(context.ident(name, a.qualifier.headOption)) + case a @ NamedExpression(name, _) => !expressionNames.add(s"$name#${a.exprId.id}") case _ => false }) if (hasDuplicates) return None diff --git a/src/main/scala/com/singlestore/spark/SQLPushdownRule.scala b/src/main/scala/com/singlestore/spark/SQLPushdownRule.scala index 2944d1fc..c76ad1e0 100644 --- a/src/main/scala/com/singlestore/spark/SQLPushdownRule.scala +++ b/src/main/scala/com/singlestore/spark/SQLPushdownRule.scala @@ -31,7 +31,7 @@ class SQLPushdownRule(sparkContext: SparkContext) // We first need to set a SQLGenContext in every reader. // This transform is done to ensure that we will generate the same aliases in the same queries. - var ptr, nextPtr = root.transform({ + val normalized = root.transform({ case SQLGen.Relation(relation) => relation.toLogicalPlan( relation.output, @@ -42,21 +42,22 @@ class SQLPushdownRule(sparkContext: SparkContext) ) }) - // In the following lines we used to create Projections with renamed columns for the every query in the Plan. + // In the following lines we create Projections with renamed columns for the every query in the Plan. // These Projections are equivalent to 'select `a` as `a#` ...'. // If the Warehouse Tables have less than 50-100 Columns that is fine because the final SQL // query string is not too long. // Per our Customers Data Models, we need to account for cases where Warehouse // Tables have more than 50-100 Columns (queries can get to ~130k characters). - // Hence, we comment out the lines below and use fully qualified names for columns in the Plan. + // + // https://github.com/memsql/singlestore-spark-connector/issues/93 // // Note: We cannot change the following lines without breaking Parallel Read for the Connector // Second, we need to rename the outputs of each SingleStore relation in the tree. This transform is // done to ensure that we can handle projections which involve ambiguous column name references. - // var ptr, nextPtr = normalized.transform({ - // case SQLGen.Relation(relation) => relation.renameOutput - // }) + var ptr, nextPtr = normalized.transform({ + case SQLGen.Relation(relation) => relation.renameOutput + }) val expressionExtractor = ExpressionExtractor(context) val transforms = diff --git a/src/main/scala/com/singlestore/spark/SinglestoreReader.scala b/src/main/scala/com/singlestore/spark/SinglestoreReader.scala index 738c9a3f..61c35375 100644 --- a/src/main/scala/com/singlestore/spark/SinglestoreReader.scala +++ b/src/main/scala/com/singlestore/spark/SinglestoreReader.scala @@ -74,7 +74,14 @@ case class SinglestoreReader(query: String, with SQLPlan with DataSourceTelemetryHelpers { - override lazy val schema: StructType = JdbcHelpers.loadSchema(options, query, variables) + private def truncatedQuery: String = { + // Truncating with one line comments produces not valid SQL + if (!log.isTraceEnabled()) { + query.stripMargin.linesIterator.map(_.trim).mkString(" ") + } else { query } + } + + override lazy val schema: StructType = JdbcHelpers.loadSchema(options, truncatedQuery, variables) override def sql: String = { val variablesWithIndex = variables.zipWithIndex @@ -93,7 +100,7 @@ case class SinglestoreReader(query: String, val randHex = Random.nextInt().toHexString val rdd = SinglestoreRDD( - query, + truncatedQuery, variables, options, schema, @@ -101,7 +108,7 @@ case class SinglestoreReader(query: String, resultMustBeSorted, expectedOutput .filter(attr => options.parallelReadRepartitionColumns.contains(attr.name)) - .map(attr => context.ident(attr.name, attr.qualifier.headOption)), + .map(attr => context.ident(attr.name, attr.exprId)), sqlContext.sparkContext, randHex, DataSourceTelemetryHelpers.createDataSourceTelemetry( @@ -141,17 +148,17 @@ case class SinglestoreReader(query: String, case (Some(p), Some(f)) => SQLGen .select(p) - .from(SQLGen.Relation(rawColumns, this, context.nextAlias(), null)) + .from(SQLGen.Relation(Nil, this, context.nextAlias(), null)) .where(f) .output(rawColumns) case (Some(p), None) => SQLGen .select(p) - .from(SQLGen.Relation(rawColumns, this, context.nextAlias(), null)) + .from(SQLGen.Relation(Nil, this, context.nextAlias(), null)) .output(rawColumns) case (None, Some(f)) => SQLGen.selectAll - .from(SQLGen.Relation(expectedOutput, this, context.nextAlias(), null)) + .from(SQLGen.Relation(Nil, this, context.nextAlias(), null)) .where(f) .output(expectedOutput) case _ => diff --git a/src/test/scala/com/singlestore/spark/SQLPushdownTest.scala b/src/test/scala/com/singlestore/spark/SQLPushdownTest.scala index 085b9faa..db86b362 100644 --- a/src/test/scala/com/singlestore/spark/SQLPushdownTest.scala +++ b/src/test/scala/com/singlestore/spark/SQLPushdownTest.scala @@ -1929,12 +1929,8 @@ class SQLPushdownTest extends IntegrationSuiteBase with BeforeAndAfterEach with "select * from users full outer join reviews on users.id = reviews.user_id") } it("natural join") { - val thrown = intercept[SQLSyntaxErrorException] { - testSingleReadForReadFromLeaves( - "select users.id, rating from users natural join (select user_id as id, rating from reviews)" - ) - } - assert(thrown.getMessage.contains("Duplicate column name")) + testSingleReadForReadFromLeaves( + "select users.id, rating from users natural join (select user_id as id, rating from reviews)") } it("complex join") { testSingleReadForReadFromLeaves( @@ -2332,6 +2328,33 @@ class SQLPushdownTest extends IntegrationSuiteBase with BeforeAndAfterEach with } } + describe("same-name column selection") { + it("join two tables which project the same column name") { + testOrderedQuery( + "select * from (select id from users) as a, (select id from movies) as b where a.id = b.id order by a.id") + } + it("select same columns twice via natural join") { + testOrderedQuery("select * from users as a natural join users order by a.id") + } + it("select same column twice from table") { + testQuery("select first_name, first_name from users", expectPartialPushdown = true) + } + it("select same column twice from table with aliases") { + testOrderedQuery("select first_name as a, first_name as a from users order by id") + } + it("select same alias twice (different column) from table") { + testOrderedQuery("select first_name as a, last_name as a from users order by id") + } + it("select same column twice in subquery") { + testQuery("select * from (select first_name, first_name from users) as x", + expectPartialPushdown = true) + } + it("select same column twice from subquery with aliases") { + testOrderedQuery( + "select * from (select first_name as a, first_name as a from users order by id) as x") + } + } + describe("datetimeExpressions") { describe("DateAdd") { it("positive num_days") { testQuery("select date_add(birthday, age) from users") } diff --git a/src/test/scala/com/singlestore/spark/SQLPushdownTestAiq.scala b/src/test/scala/com/singlestore/spark/SQLPushdownTestAiq.scala index 71f054bf..a07bff72 100644 --- a/src/test/scala/com/singlestore/spark/SQLPushdownTestAiq.scala +++ b/src/test/scala/com/singlestore/spark/SQLPushdownTestAiq.scala @@ -1761,77 +1761,4 @@ class SQLPushdownTestAiq extends IntegrationSuiteBase with BeforeAndAfterEach wi } } } - - describe("Same-name Column Selection") { - it("join two tables which project the same column name") { - val thrown = intercept[SQLSyntaxErrorException] { - testOrderedQuery( - """ - |select - | * - |from - | (select id from users) as a, - | (select id from movies) as b - |where a.id = b.id - |order by a.id - |""".stripMargin.linesIterator.map(_.trim).mkString(" ") - ) - } - assert(thrown.getMessage.contains("Duplicate column name")) - } - it("select same columns twice via natural join") { - val thrown = intercept[SQLSyntaxErrorException] { - testOrderedQuery("select * from users as a natural join users order by a.id") - } - assert(thrown.getMessage.contains("Duplicate column name")) - } - it("select same column twice from table") { - testQuery("select first_name, first_name from users", expectPartialPushdown = true) - } - it("select same column twice from table with aliases") { - val thrown = intercept[AnalysisException] { - testQuery( - "select first_name as a, first_name as a from users", - expectPartialPushdown = true - ) - } - assert( - thrown.getMessage.contains( - "Column 'a' does not exist. Did you mean one of the following? [a, a];" - ) - ) - } - it("select same alias twice (different column) from table") { - val thrown = intercept[AnalysisException] { - testQuery( - "select first_name as a, last_name as a from users", - expectPartialPushdown = true - ) - } - assert( - thrown.getMessage.contains( - "Column 'a' does not exist. Did you mean one of the following? [a, a];" - ) - ) - } - it("select same column twice in subquery") { - testQuery( - "select * from (select first_name, first_name from users) as x", - expectPartialPushdown = true - ) - } - it("select same column twice from subquery with aliases") { - val thrown = intercept[AnalysisException] { - testQuery( - "select * from (select first_name as a, first_name as a from users) as x", - expectPartialPushdown = true - ) - } - assert( - thrown.getMessage.contains( - "Column 'a' does not exist. Did you mean one of the following? [x.a, x.a];" - ) - ) - } - } }