Skip to content

Commit

Permalink
[SPARK-47167][SQL] Add concrete class for JDBC anonymous relation
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

- Introducing new class to avoid creation of anonymous class in JDBCScan::toV1TableScan method

### Why are the changes needed?
This change should provide us better abilities for logging and debugging information about relation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Since changes are very simple, no testing was done, build is pre check-in validation tests are enough to show everything is alright.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45259 from urosstan-db/SPARK-47167-add-descriptive-relation.

Authored-by: Uros Stankovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
urosstan-db authored and cloud-fan committed Mar 1, 2024
1 parent a5a2822 commit 2981618
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2.jdbc

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.V1Scan
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation
Expand All @@ -40,20 +39,17 @@ case class JDBCScan(
override def readSchema(): StructType = prunedSchema

override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
new BaseRelation with TableScan {
override def sqlContext: SQLContext = context
override def schema: StructType = prunedSchema
override def needConversion: Boolean = relation.needConversion
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}
relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
pushedLimit, sortOrders, pushedOffset)
}
}.asInstanceOf[T]
JDBCV1RelationFromV2Scan(
context,
prunedSchema,
relation,
pushedPredicates,
pushedAggregateColumn,
groupByColumns,
tableSample,
pushedLimit,
sortOrders,
pushedOffset).asInstanceOf[T]
}

override def description(): String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.jdbc

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType

/**
* Relation that is compatible with V1 TableScan,
* but it uses JDBCRelation's buildScan which accepts all v2 pushdowns
*/
case class JDBCV1RelationFromV2Scan(
context: SQLContext,
prunedSchema: StructType,
relation: JDBCRelation,
pushedPredicates: Array[Predicate],
pushedAggregateColumn: Array[String] = Array(),
groupByColumns: Option[Array[String]],
tableSample: Option[TableSampleInfo],
pushedLimit: Int,
sortOrders: Array[String],
pushedOffset: Int) extends BaseRelation with TableScan {
override def sqlContext: SQLContext = context
override def schema: StructType = prunedSchema
override def needConversion: Boolean = relation.needConversion
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}

relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
pushedLimit, sortOrders, pushedOffset)
}

override def toString: String = "JDBC v1 Relation from v2 scan"
}

0 comments on commit 2981618

Please sign in to comment.